aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorWeidiDeng <[email protected]>2024-04-16 01:37:37 +0800
committerGitHub <[email protected]>2024-04-15 11:37:37 -0600
commitb40cacf5ce8f6a190a002fa44e973f1aa5bac2b0 (patch)
tree46ea0ad3575e087855ff78746e4fd8f5c48dfcc6
parent81413caea251a3ef9e3641d7b1b6e867572a2b1b (diff)
downloadcaddy-b40cacf5ce8f6a190a002fa44e973f1aa5bac2b0.tar.gz
caddy-b40cacf5ce8f6a190a002fa44e973f1aa5bac2b0.zip
reverseproxy: Wait for both ends of websocket to close (#6175)
-rw-r--r--modules/caddyhttp/reverseproxy/reverseproxy.go4
-rw-r--r--modules/caddyhttp/reverseproxy/streaming.go8
2 files changed, 9 insertions, 3 deletions
diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go
index 1648b7791..deba304a7 100644
--- a/modules/caddyhttp/reverseproxy/reverseproxy.go
+++ b/modules/caddyhttp/reverseproxy/reverseproxy.go
@@ -922,7 +922,9 @@ func (h *Handler) finalizeResponse(
) error {
// deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
if res.StatusCode == http.StatusSwitchingProtocols {
- h.handleUpgradeResponse(logger, rw, req, res)
+ var wg sync.WaitGroup
+ h.handleUpgradeResponse(logger, &wg, rw, req, res)
+ wg.Wait()
return nil
}
diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go
index 2a5b5286a..6d8990575 100644
--- a/modules/caddyhttp/reverseproxy/streaming.go
+++ b/modules/caddyhttp/reverseproxy/streaming.go
@@ -34,7 +34,7 @@ import (
"golang.org/x/net/http/httpguts"
)
-func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) {
+func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, rw http.ResponseWriter, req *http.Request, res *http.Response) {
reqUpType := upgradeType(req.Header)
resUpType := upgradeType(res.Header)
@@ -121,7 +121,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
defer deleteFrontConn()
defer deleteBackConn()
- spc := switchProtocolCopier{user: conn, backend: backConn}
+ spc := switchProtocolCopier{user: conn, backend: backConn, wg: wg}
// setup the timeout if requested
var timeoutc <-chan time.Time
@@ -132,6 +132,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
}
errc := make(chan error, 1)
+ wg.Add(2)
go spc.copyToBackend(errc)
go spc.copyFromBackend(errc)
select {
@@ -529,16 +530,19 @@ func (m *maxLatencyWriter) stop() {
// forth have nice names in stacks.
type switchProtocolCopier struct {
user, backend io.ReadWriteCloser
+ wg *sync.WaitGroup
}
func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {
_, err := io.Copy(c.user, c.backend)
errc <- err
+ c.wg.Done()
}
func (c switchProtocolCopier) copyToBackend(errc chan<- error) {
_, err := io.Copy(c.backend, c.user)
errc <- err
+ c.wg.Done()
}
var streamingBufPool = sync.Pool{