diff options
Diffstat (limited to 'modules/caddyhttp/reverseproxy/streaming.go')
-rw-r--r-- | modules/caddyhttp/reverseproxy/streaming.go | 84 |
1 files changed, 59 insertions, 25 deletions
diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index a696ac7fb..c871a3fa1 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -31,6 +31,7 @@ import ( "unsafe" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/net/http/httpguts" ) @@ -41,14 +42,18 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, // Taken from https://github.com/golang/go/commit/5c489514bc5e61ad9b5b07bd7d8ec65d66a0512a // We know reqUpType is ASCII, it's checked by the caller. if !asciiIsPrint(resUpType) { - logger.Debug("backend tried to switch to invalid protocol", - zap.String("backend_upgrade", resUpType)) + if c := logger.Check(zapcore.DebugLevel, "backend tried to switch to invalid protocol"); c != nil { + c.Write(zap.String("backend_upgrade", resUpType)) + } return } if !asciiEqualFold(reqUpType, resUpType) { - logger.Debug("backend tried to switch to unexpected protocol via Upgrade header", - zap.String("backend_upgrade", resUpType), - zap.String("requested_upgrade", reqUpType)) + if c := logger.Check(zapcore.DebugLevel, "backend tried to switch to unexpected protocol via Upgrade header"); c != nil { + c.Write( + zap.String("backend_upgrade", resUpType), + zap.String("requested_upgrade", reqUpType), + ) + } return } @@ -68,12 +73,16 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, //nolint:bodyclose conn, brw, hijackErr := http.NewResponseController(rw).Hijack() if errors.Is(hijackErr, http.ErrNotSupported) { - h.logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw))) + if c := logger.Check(zapcore.ErrorLevel, "can't switch protocols using non-Hijacker ResponseWriter"); c != nil { + c.Write(zap.String("type", fmt.Sprintf("%T", rw))) + } return } if hijackErr != nil { - h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) + if c := logger.Check(zapcore.ErrorLevel, "hijack failed on protocol switch"); c != nil { + c.Write(zap.Error(hijackErr)) + } return } @@ -93,11 +102,15 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, start := time.Now() defer func() { conn.Close() - logger.Debug("connection closed", zap.Duration("duration", time.Since(start))) + if c := logger.Check(zapcore.DebugLevel, "hijack failed on protocol switch"); c != nil { + c.Write(zap.Duration("duration", time.Since(start))) + } }() if err := brw.Flush(); err != nil { - logger.Debug("response flush", zap.Error(err)) + if c := logger.Check(zapcore.DebugLevel, "response flush"); c != nil { + c.Write(zap.Error(err)) + } return } @@ -107,7 +120,9 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, data, _ := brw.Peek(buffered) _, err := backConn.Write(data) if err != nil { - logger.Debug("backConn write failed", zap.Error(err)) + if c := logger.Check(zapcore.DebugLevel, "backConn write failed"); c != nil { + c.Write(zap.Error(err)) + } return } } @@ -148,9 +163,13 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, go spc.copyFromBackend(errc) select { case err := <-errc: - logger.Debug("streaming error", zap.Error(err)) + if c := logger.Check(zapcore.DebugLevel, "streaming error"); c != nil { + c.Write(zap.Error(err)) + } case time := <-timeoutc: - logger.Debug("stream timed out", zap.Time("timeout", time)) + if c := logger.Check(zapcore.DebugLevel, "stream timed out"); c != nil { + c.Write(zap.Time("timeout", time)) + } } } @@ -247,7 +266,9 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *za logger.Debug("waiting to read from upstream") nr, rerr := src.Read(buf) logger := logger.With(zap.Int("read", nr)) - logger.Debug("read from upstream", zap.Error(rerr)) + if c := logger.Check(zapcore.DebugLevel, "read from upstream"); c != nil { + c.Write(zap.Error(rerr)) + } if rerr != nil && rerr != io.EOF && rerr != context.Canceled { // TODO: this could be useful to know (indeed, it revealed an error in our // fastcgi PoC earlier; but it's this single error report here that necessitates @@ -256,7 +277,9 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *za // something we need to report to the client, but read errors are a problem on our // end for sure. so we need to decide what we want.) // p.logf("copyBuffer: ReverseProxy read error during body copy: %v", rerr) - h.logger.Error("reading from backend", zap.Error(rerr)) + if c := logger.Check(zapcore.ErrorLevel, "reading from backend"); c != nil { + c.Write(zap.Error(rerr)) + } } if nr > 0 { logger.Debug("writing to downstream") @@ -264,10 +287,13 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *za if nw > 0 { written += int64(nw) } - logger.Debug("wrote to downstream", - zap.Int("written", nw), - zap.Int64("written_total", written), - zap.Error(werr)) + if c := logger.Check(zapcore.DebugLevel, "wrote to downstream"); c != nil { + c.Write( + zap.Int("written", nw), + zap.Int64("written_total", written), + zap.Error(werr), + ) + } if werr != nil { return written, fmt.Errorf("writing: %w", werr) } @@ -347,13 +373,17 @@ func (h *Handler) cleanupConnections() error { if len(h.connections) > 0 { delay := time.Duration(h.StreamCloseDelay) h.connectionsCloseTimer = time.AfterFunc(delay, func() { - h.logger.Debug("closing streaming connections after delay", - zap.Duration("delay", delay)) + if c := h.logger.Check(zapcore.DebugLevel, "closing streaming connections after delay"); c != nil { + c.Write(zap.Duration("delay", delay)) + } err := h.closeConnections() if err != nil { - h.logger.Error("failed to closed connections after delay", - zap.Error(err), - zap.Duration("delay", delay)) + if c := h.logger.Check(zapcore.ErrorLevel, "failed to closed connections after delay"); c != nil { + c.Write( + zap.Error(err), + zap.Duration("delay", delay), + ) + } } }) } @@ -494,7 +524,9 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { m.mu.Lock() defer m.mu.Unlock() n, err = m.dst.Write(p) - m.logger.Debug("wrote bytes", zap.Int("n", n), zap.Error(err)) + if c := m.logger.Check(zapcore.DebugLevel, "wrote bytes"); c != nil { + c.Write(zap.Int("n", n), zap.Error(err)) + } if m.latency < 0 { m.logger.Debug("flushing immediately") //nolint:errcheck @@ -510,7 +542,9 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { } else { m.t.Reset(m.latency) } - m.logger.Debug("timer set for delayed flush", zap.Duration("duration", m.latency)) + if c := m.logger.Check(zapcore.DebugLevel, "timer set for delayed flush"); c != nil { + c.Write(zap.Duration("duration", m.latency)) + } m.flushPending = true return } |