aboutsummaryrefslogtreecommitdiffhomepage
path: root/modules/caddyhttp/reverseproxy/streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/caddyhttp/reverseproxy/streaming.go')
-rw-r--r--modules/caddyhttp/reverseproxy/streaming.go84
1 files changed, 59 insertions, 25 deletions
diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go
index a696ac7fb..c871a3fa1 100644
--- a/modules/caddyhttp/reverseproxy/streaming.go
+++ b/modules/caddyhttp/reverseproxy/streaming.go
@@ -31,6 +31,7 @@ import (
"unsafe"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
"golang.org/x/net/http/httpguts"
)
@@ -41,14 +42,18 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
// Taken from https://github.com/golang/go/commit/5c489514bc5e61ad9b5b07bd7d8ec65d66a0512a
// We know reqUpType is ASCII, it's checked by the caller.
if !asciiIsPrint(resUpType) {
- logger.Debug("backend tried to switch to invalid protocol",
- zap.String("backend_upgrade", resUpType))
+ if c := logger.Check(zapcore.DebugLevel, "backend tried to switch to invalid protocol"); c != nil {
+ c.Write(zap.String("backend_upgrade", resUpType))
+ }
return
}
if !asciiEqualFold(reqUpType, resUpType) {
- logger.Debug("backend tried to switch to unexpected protocol via Upgrade header",
- zap.String("backend_upgrade", resUpType),
- zap.String("requested_upgrade", reqUpType))
+ if c := logger.Check(zapcore.DebugLevel, "backend tried to switch to unexpected protocol via Upgrade header"); c != nil {
+ c.Write(
+ zap.String("backend_upgrade", resUpType),
+ zap.String("requested_upgrade", reqUpType),
+ )
+ }
return
}
@@ -68,12 +73,16 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
//nolint:bodyclose
conn, brw, hijackErr := http.NewResponseController(rw).Hijack()
if errors.Is(hijackErr, http.ErrNotSupported) {
- h.logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw)))
+ if c := logger.Check(zapcore.ErrorLevel, "can't switch protocols using non-Hijacker ResponseWriter"); c != nil {
+ c.Write(zap.String("type", fmt.Sprintf("%T", rw)))
+ }
return
}
if hijackErr != nil {
- h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr))
+ if c := logger.Check(zapcore.ErrorLevel, "hijack failed on protocol switch"); c != nil {
+ c.Write(zap.Error(hijackErr))
+ }
return
}
@@ -93,11 +102,15 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
start := time.Now()
defer func() {
conn.Close()
- logger.Debug("connection closed", zap.Duration("duration", time.Since(start)))
+ if c := logger.Check(zapcore.DebugLevel, "hijack failed on protocol switch"); c != nil {
+ c.Write(zap.Duration("duration", time.Since(start)))
+ }
}()
if err := brw.Flush(); err != nil {
- logger.Debug("response flush", zap.Error(err))
+ if c := logger.Check(zapcore.DebugLevel, "response flush"); c != nil {
+ c.Write(zap.Error(err))
+ }
return
}
@@ -107,7 +120,9 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
data, _ := brw.Peek(buffered)
_, err := backConn.Write(data)
if err != nil {
- logger.Debug("backConn write failed", zap.Error(err))
+ if c := logger.Check(zapcore.DebugLevel, "backConn write failed"); c != nil {
+ c.Write(zap.Error(err))
+ }
return
}
}
@@ -148,9 +163,13 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
go spc.copyFromBackend(errc)
select {
case err := <-errc:
- logger.Debug("streaming error", zap.Error(err))
+ if c := logger.Check(zapcore.DebugLevel, "streaming error"); c != nil {
+ c.Write(zap.Error(err))
+ }
case time := <-timeoutc:
- logger.Debug("stream timed out", zap.Time("timeout", time))
+ if c := logger.Check(zapcore.DebugLevel, "stream timed out"); c != nil {
+ c.Write(zap.Time("timeout", time))
+ }
}
}
@@ -247,7 +266,9 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *za
logger.Debug("waiting to read from upstream")
nr, rerr := src.Read(buf)
logger := logger.With(zap.Int("read", nr))
- logger.Debug("read from upstream", zap.Error(rerr))
+ if c := logger.Check(zapcore.DebugLevel, "read from upstream"); c != nil {
+ c.Write(zap.Error(rerr))
+ }
if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
// TODO: this could be useful to know (indeed, it revealed an error in our
// fastcgi PoC earlier; but it's this single error report here that necessitates
@@ -256,7 +277,9 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *za
// something we need to report to the client, but read errors are a problem on our
// end for sure. so we need to decide what we want.)
// p.logf("copyBuffer: ReverseProxy read error during body copy: %v", rerr)
- h.logger.Error("reading from backend", zap.Error(rerr))
+ if c := logger.Check(zapcore.ErrorLevel, "reading from backend"); c != nil {
+ c.Write(zap.Error(rerr))
+ }
}
if nr > 0 {
logger.Debug("writing to downstream")
@@ -264,10 +287,13 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *za
if nw > 0 {
written += int64(nw)
}
- logger.Debug("wrote to downstream",
- zap.Int("written", nw),
- zap.Int64("written_total", written),
- zap.Error(werr))
+ if c := logger.Check(zapcore.DebugLevel, "wrote to downstream"); c != nil {
+ c.Write(
+ zap.Int("written", nw),
+ zap.Int64("written_total", written),
+ zap.Error(werr),
+ )
+ }
if werr != nil {
return written, fmt.Errorf("writing: %w", werr)
}
@@ -347,13 +373,17 @@ func (h *Handler) cleanupConnections() error {
if len(h.connections) > 0 {
delay := time.Duration(h.StreamCloseDelay)
h.connectionsCloseTimer = time.AfterFunc(delay, func() {
- h.logger.Debug("closing streaming connections after delay",
- zap.Duration("delay", delay))
+ if c := h.logger.Check(zapcore.DebugLevel, "closing streaming connections after delay"); c != nil {
+ c.Write(zap.Duration("delay", delay))
+ }
err := h.closeConnections()
if err != nil {
- h.logger.Error("failed to closed connections after delay",
- zap.Error(err),
- zap.Duration("delay", delay))
+ if c := h.logger.Check(zapcore.ErrorLevel, "failed to closed connections after delay"); c != nil {
+ c.Write(
+ zap.Error(err),
+ zap.Duration("delay", delay),
+ )
+ }
}
})
}
@@ -494,7 +524,9 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
m.mu.Lock()
defer m.mu.Unlock()
n, err = m.dst.Write(p)
- m.logger.Debug("wrote bytes", zap.Int("n", n), zap.Error(err))
+ if c := m.logger.Check(zapcore.DebugLevel, "wrote bytes"); c != nil {
+ c.Write(zap.Int("n", n), zap.Error(err))
+ }
if m.latency < 0 {
m.logger.Debug("flushing immediately")
//nolint:errcheck
@@ -510,7 +542,9 @@ func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
} else {
m.t.Reset(m.latency)
}
- m.logger.Debug("timer set for delayed flush", zap.Duration("duration", m.latency))
+ if c := m.logger.Check(zapcore.DebugLevel, "timer set for delayed flush"); c != nil {
+ c.Write(zap.Duration("duration", m.latency))
+ }
m.flushPending = true
return
}