From 030b6051f11e6ff6e5a936c6e4da2389aa0d0660 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Sat, 7 Dec 2024 04:23:27 +0800 Subject: reverseproxy: Rewrite requests and responses for websocket over http2 (#6567) * reverse proxy: rewrite requests and responses for websocket over http2 * delete protocol pseudo-header * modify cloned requests * set request variable to track if it's a h2 websocket * use request bodu * rewrite request body * use WebSocket instead of Websocket in the headers * use logger check for zap loggers * fix lint --- modules/caddyhttp/reverseproxy/reverseproxy.go | 19 ++++++ modules/caddyhttp/reverseproxy/streaming.go | 85 ++++++++++++++++++++++---- 2 files changed, 92 insertions(+), 12 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index ba9e5f3fc..907b47b0a 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -17,6 +17,8 @@ package reverseproxy import ( "bytes" "context" + "crypto/rand" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -412,6 +414,23 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht return caddyhttp.Error(http.StatusInternalServerError, fmt.Errorf("preparing request for upstream round-trip: %v", err)) } + // websocket over http2, assuming backend doesn't support this, the request will be modified to http1.1 upgrade + // TODO: once we can reliably detect backend support this, it can be removed for those backends + if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { + clonedReq.Header.Del(":protocol") + // keep the body for later use. http1.1 upgrade uses http.NoBody + caddyhttp.SetVar(clonedReq.Context(), "h2_websocket_body", clonedReq.Body) + clonedReq.Body = http.NoBody + clonedReq.Method = http.MethodGet + clonedReq.Header.Set("Upgrade", r.Header.Get(":protocol")) + clonedReq.Header.Set("Connection", "Upgrade") + key := make([]byte, 16) + _, randErr := rand.Read(key) + if randErr != nil { + return randErr + } + clonedReq.Header["Sec-WebSocket-Key"] = []string{base64.StdEncoding.EncodeToString(key)} + } // we will need the original headers and Host value if // header operations are configured; this is so that each diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 155a1df0c..a517fea4d 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -19,6 +19,7 @@ package reverseproxy import ( + "bufio" "context" "errors" "fmt" @@ -32,9 +33,30 @@ import ( "go.uber.org/zap" "golang.org/x/net/http/httpguts" + + "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) -func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response) { +type h2ReadWriteCloser struct { + io.ReadCloser + http.ResponseWriter +} + +func (rwc h2ReadWriteCloser) Write(p []byte) (n int, err error) { + n, err = rwc.ResponseWriter.Write(p) + if err != nil { + return 0, err + } + + //nolint:bodyclose + err = http.NewResponseController(rwc.ResponseWriter).Flush() + if err != nil { + return 0, err + } + return n, nil +} + +func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, rw http.ResponseWriter, req *http.Request, res *http.Response) { reqUpType := upgradeType(req.Header) resUpType := upgradeType(res.Header) @@ -61,20 +83,59 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit // write header first, response headers should not be counted in size // like the rest of handler chain. copyHeader(rw.Header(), res.Header) - rw.WriteHeader(res.StatusCode) + normalizeWebsocketHeaders(rw.Header()) + + var ( + conn io.ReadWriteCloser + brw *bufio.ReadWriter + ) + // websocket over http2, assuming backend doesn't support this, the request will be modified to http1.1 upgrade + // TODO: once we can reliably detect backend support this, it can be removed for those backends + if body, ok := caddyhttp.GetVar(req.Context(), "h2_websocket_body").(io.ReadCloser); ok { + req.Body = body + rw.Header().Del("Upgrade") + rw.Header().Del("Connection") + delete(rw.Header(), "Sec-WebSocket-Accept") + rw.WriteHeader(http.StatusOK) + + if c := logger.Check(zap.DebugLevel, "upgrading connection"); c != nil { + c.Write(zap.Int("http_version", 2)) + } - logger.Debug("upgrading connection") + //nolint:bodyclose + flushErr := http.NewResponseController(rw).Flush() + if flushErr != nil { + if c := h.logger.Check(zap.ErrorLevel, "failed to flush http2 websocket response"); c != nil { + c.Write(zap.Error(flushErr)) + } + return + } + conn = h2ReadWriteCloser{req.Body, rw} + // bufio is not needed, use minimal buffer + brw = bufio.NewReadWriter(bufio.NewReaderSize(conn, 1), bufio.NewWriterSize(conn, 1)) + } else { + rw.WriteHeader(res.StatusCode) - //nolint:bodyclose - conn, brw, hijackErr := http.NewResponseController(rw).Hijack() - if errors.Is(hijackErr, http.ErrNotSupported) { - h.logger.Sugar().Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw) - return - } + if c := logger.Check(zap.DebugLevel, "upgrading connection"); c != nil { + c.Write(zap.Int("http_version", req.ProtoMajor)) + } - if hijackErr != nil { - h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) - return + var hijackErr error + //nolint:bodyclose + conn, brw, hijackErr = http.NewResponseController(rw).Hijack() + if errors.Is(hijackErr, http.ErrNotSupported) { + if c := h.logger.Check(zap.ErrorLevel, "can't switch protocols using non-Hijacker ResponseWriter"); c != nil { + c.Write(zap.String("type", fmt.Sprintf("%T", rw))) + } + return + } + + if hijackErr != nil { + if c := h.logger.Check(zap.ErrorLevel, "hijack failed on protocol switch"); c != nil { + c.Write(zap.Error(hijackErr)) + } + return + } } // adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5 -- cgit v1.2.3