summaryrefslogtreecommitdiffhomepage
path: root/modules/caddyhttp/reverseproxy/streaming.go
blob: a696ac7fbedfeb93bd0dbdcdc415d6aff1134420 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
// Copyright 2015 Matthew Holt and The Caddy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Most of the code in this file was initially borrowed from the Go
// standard library and modified; It had this copyright notice:
// Copyright 2011 The Go Authors

package reverseproxy

import (
	"context"
	"errors"
	"fmt"
	"io"
	weakrand "math/rand"
	"mime"
	"net/http"
	"sync"
	"time"
	"unsafe"

	"go.uber.org/zap"
	"golang.org/x/net/http/httpguts"
)

func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, rw http.ResponseWriter, req *http.Request, res *http.Response) {
	reqUpType := upgradeType(req.Header)
	resUpType := upgradeType(res.Header)

	// Taken from https://github.com/golang/go/commit/5c489514bc5e61ad9b5b07bd7d8ec65d66a0512a
	// We know reqUpType is ASCII, it's checked by the caller.
	if !asciiIsPrint(resUpType) {
		logger.Debug("backend tried to switch to invalid protocol",
			zap.String("backend_upgrade", resUpType))
		return
	}
	if !asciiEqualFold(reqUpType, resUpType) {
		logger.Debug("backend tried to switch to unexpected protocol via Upgrade header",
			zap.String("backend_upgrade", resUpType),
			zap.String("requested_upgrade", reqUpType))
		return
	}

	backConn, ok := res.Body.(io.ReadWriteCloser)
	if !ok {
		logger.Error("internal error: 101 switching protocols response with non-writable body")
		return
	}

	// write header first, response headers should not be counted in size
	// like the rest of handler chain.
	copyHeader(rw.Header(), res.Header)
	rw.WriteHeader(res.StatusCode)

	logger.Debug("upgrading connection")

	//nolint:bodyclose
	conn, brw, hijackErr := http.NewResponseController(rw).Hijack()
	if errors.Is(hijackErr, http.ErrNotSupported) {
		h.logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw)))
		return
	}

	if hijackErr != nil {
		h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr))
		return
	}

	// adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5
	backConnCloseCh := make(chan struct{})
	go func() {
		// Ensure that the cancelation of a request closes the backend.
		// See issue https://golang.org/issue/35559.
		select {
		case <-req.Context().Done():
		case <-backConnCloseCh:
		}
		backConn.Close()
	}()
	defer close(backConnCloseCh)

	start := time.Now()
	defer func() {
		conn.Close()
		logger.Debug("connection closed", zap.Duration("duration", time.Since(start)))
	}()

	if err := brw.Flush(); err != nil {
		logger.Debug("response flush", zap.Error(err))
		return
	}

	// There may be buffered data in the *bufio.Reader
	// see: https://github.com/caddyserver/caddy/issues/6273
	if buffered := brw.Reader.Buffered(); buffered > 0 {
		data, _ := brw.Peek(buffered)
		_, err := backConn.Write(data)
		if err != nil {
			logger.Debug("backConn write failed", zap.Error(err))
			return
		}
	}

	// Ensure the hijacked client connection, and the new connection established
	// with the backend, are both closed in the event of a server shutdown. This
	// is done by registering them. We also try to gracefully close connections
	// we recognize as websockets.
	// We need to make sure the client connection messages (i.e. to upstream)
	// are masked, so we need to know whether the connection is considered the
	// server or the client side of the proxy.
	gracefulClose := func(conn io.ReadWriteCloser, isClient bool) func() error {
		if isWebsocket(req) {
			return func() error {
				return writeCloseControl(conn, isClient)
			}
		}
		return nil
	}
	deleteFrontConn := h.registerConnection(conn, gracefulClose(conn, false))
	deleteBackConn := h.registerConnection(backConn, gracefulClose(backConn, true))
	defer deleteFrontConn()
	defer deleteBackConn()

	spc := switchProtocolCopier{user: conn, backend: backConn, wg: wg}

	// setup the timeout if requested
	var timeoutc <-chan time.Time
	if h.StreamTimeout > 0 {
		timer := time.NewTimer(time.Duration(h.StreamTimeout))
		defer timer.Stop()
		timeoutc = timer.C
	}

	errc := make(chan error, 1)
	wg.Add(2)
	go spc.copyToBackend(errc)
	go spc.copyFromBackend(errc)
	select {
	case err := <-errc:
		logger.Debug("streaming error", zap.Error(err))
	case time := <-timeoutc:
		logger.Debug("stream timed out", zap.Time("timeout", time))
	}
}

// flushInterval returns the p.FlushInterval value, conditionally
// overriding its value for a specific request/response.
func (h Handler) flushInterval(req *http.Request, res *http.Response) time.Duration {
	resCTHeader := res.Header.Get("Content-Type")
	resCT, _, err := mime.ParseMediaType(resCTHeader)

	// For Server-Sent Events responses, flush immediately.
	// The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
	if err == nil && resCT == "text/event-stream" {
		return -1 // negative means immediately
	}

	// We might have the case of streaming for which Content-Length might be unset.
	if res.ContentLength == -1 {
		return -1
	}

	// for h2 and h2c upstream streaming data to client (issues #3556 and #3606)
	if h.isBidirectionalStream(req, res) {
		return -1
	}

	return time.Duration(h.FlushInterval)
}

// isBidirectionalStream returns whether we should work in bi-directional stream mode.
//
// See https://github.com/caddyserver/caddy/pull/3620 for discussion of nuances.
func (h Handler) isBidirectionalStream(req *http.Request, res *http.Response) bool {
	// We have to check the encoding here; only flush headers with identity encoding.
	// Non-identity encoding might combine with "encode" directive, and in that case,
	// if body size larger than enc.MinLength, upper level encode handle might have
	// Content-Encoding header to write.
	// (see https://github.com/caddyserver/caddy/issues/3606 for use case)
	ae := req.Header.Get("Accept-Encoding")

	return req.ProtoMajor == 2 &&
		res.ProtoMajor == 2 &&
		res.ContentLength == -1 &&
		(ae == "identity" || ae == "")
}

func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration, logger *zap.Logger) error {
	var w io.Writer = dst

	if flushInterval != 0 {
		var mlwLogger *zap.Logger
		if h.VerboseLogs {
			mlwLogger = logger.Named("max_latency_writer")
		} else {
			mlwLogger = zap.NewNop()
		}
		mlw := &maxLatencyWriter{
			dst: dst,
			//nolint:bodyclose
			flush:   http.NewResponseController(dst).Flush,
			latency: flushInterval,
			logger:  mlwLogger,
		}
		defer mlw.stop()

		// set up initial timer so headers get flushed even if body writes are delayed
		mlw.flushPending = true
		mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)

		w = mlw
	}

	buf := streamingBufPool.Get().(*[]byte)
	defer streamingBufPool.Put(buf)

	var copyLogger *zap.Logger
	if h.VerboseLogs {
		copyLogger = logger
	} else {
		copyLogger = zap.NewNop()
	}

	_, err := h.copyBuffer(w, src, *buf, copyLogger)
	return err
}

// copyBuffer returns any write errors or non-EOF read errors, and the amount
// of bytes written.
func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte, logger *zap.Logger) (int64, error) {
	if len(buf) == 0 {
		buf = make([]byte, defaultBufferSize)
	}
	var written int64
	for {
		logger.Debug("waiting to read from upstream")
		nr, rerr := src.Read(buf)
		logger := logger.With(zap.Int("read", nr))
		logger.Debug("read from upstream", zap.Error(rerr))
		if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
			// TODO: this could be useful to know (indeed, it revealed an error in our
			// fastcgi PoC earlier; but it's this single error report here that necessitates
			// a function separate from io.CopyBuffer, since io.CopyBuffer does not distinguish
			// between read or write errors; in a reverse proxy situation, write errors are not
			// something we need to report to the client, but read errors are a problem on our
			// end for sure. so we need to decide what we want.)
			// p.logf("copyBuffer: ReverseProxy read error during body copy: %v", rerr)
			h.logger.Error("reading from backend", zap.Error(rerr))
		}
		if nr > 0 {
			logger.Debug("writing to downstream")
			nw, werr := dst.Write(buf[:nr])
			if nw > 0 {
				written += int64(nw)
			}
			logger.Debug("wrote to downstream",
				zap.Int("written", nw),
				zap.Int64("written_total", written),
				zap.Error(werr))
			if werr != nil {
				return written, fmt.Errorf("writing: %w", werr)
			}
			if nr != nw {
				return written, io.ErrShortWrite
			}
		}
		if rerr != nil {
			if rerr == io.EOF {
				return written, nil
			}
			return written, fmt.Errorf("reading: %w", rerr)
		}
	}
}

// registerConnection holds onto conn so it can be closed in the event
// of a server shutdown. This is useful because hijacked connections or
// connections dialed to backends don't close when server is shut down.
// The caller should call the returned delete() function when the
// connection is done to remove it from memory.
func (h *Handler) registerConnection(conn io.ReadWriteCloser, gracefulClose func() error) (del func()) {
	h.connectionsMu.Lock()
	h.connections[conn] = openConnection{conn, gracefulClose}
	h.connectionsMu.Unlock()
	return func() {
		h.connectionsMu.Lock()
		delete(h.connections, conn)
		// if there is no connection left before the connections close timer fires
		if len(h.connections) == 0 && h.connectionsCloseTimer != nil {
			// we release the timer that holds the reference to Handler
			if (*h.connectionsCloseTimer).Stop() {
				h.logger.Debug("stopped streaming connections close timer - all connections are already closed")
			}
			h.connectionsCloseTimer = nil
		}
		h.connectionsMu.Unlock()
	}
}

// closeConnections immediately closes all hijacked connections (both to client and backend).
func (h *Handler) closeConnections() error {
	var err error
	h.connectionsMu.Lock()
	defer h.connectionsMu.Unlock()

	for _, oc := range h.connections {
		if oc.gracefulClose != nil {
			// this is potentially blocking while we have the lock on the connections
			// map, but that should be OK since the server has in theory shut down
			// and we are no longer using the connections map
			gracefulErr := oc.gracefulClose()
			if gracefulErr != nil && err == nil {
				err = gracefulErr
			}
		}
		closeErr := oc.conn.Close()
		if closeErr != nil && err == nil {
			err = closeErr
		}
	}
	return err
}

// cleanupConnections closes hijacked connections.
// Depending on the value of StreamCloseDelay it does that either immediately
// or sets up a timer that will do that later.
func (h *Handler) cleanupConnections() error {
	if h.StreamCloseDelay == 0 {
		return h.closeConnections()
	}

	h.connectionsMu.Lock()
	defer h.connectionsMu.Unlock()
	// the handler is shut down, no new connection can appear,
	// so we can skip setting up the timer when there are no connections
	if len(h.connections) > 0 {
		delay := time.Duration(h.StreamCloseDelay)
		h.connectionsCloseTimer = time.AfterFunc(delay, func() {
			h.logger.Debug("closing streaming connections after delay",
				zap.Duration("delay", delay))
			err := h.closeConnections()
			if err != nil {
				h.logger.Error("failed to closed connections after delay",
					zap.Error(err),
					zap.Duration("delay", delay))
			}
		})
	}
	return nil
}

// writeCloseControl sends a best-effort Close control message to the given
// WebSocket connection. Thanks to @pascaldekloe who provided inspiration
// from his simple implementation of this I was able to learn from at:
// github.com/pascaldekloe/websocket. Further work for handling masking
// taken from github.com/gorilla/websocket.
func writeCloseControl(conn io.Writer, isClient bool) error {
	// Sources:
	// https://github.com/pascaldekloe/websocket/blob/32050af67a5d/websocket.go#L119
	// https://github.com/gorilla/websocket/blob/v1.5.0/conn.go#L413

	// For now, we're not using a reason. We might later, though.
	// The code handling the reason is left in
	var reason string // max 123 bytes (control frame payload limit is 125; status code takes 2)

	const closeMessage = 8
	const finalBit = 1 << 7 // Frame header byte 0 bits from Section 5.2 of RFC 6455
	const maskBit = 1 << 7  // Frame header byte 1 bits from Section 5.2 of RFC 6455
	const goingAwayUpper uint8 = 1001 >> 8
	const goingAwayLower uint8 = 1001 & 0xff

	b0 := byte(closeMessage) | finalBit
	b1 := byte(len(reason) + 2)
	if isClient {
		b1 |= maskBit
	}

	buf := make([]byte, 0, 127)
	buf = append(buf, b0, b1)
	msgLength := 4 + len(reason)

	// Both branches below append the "going away" code and reason
	appendMessage := func(buf []byte) []byte {
		buf = append(buf, goingAwayUpper, goingAwayLower)
		buf = append(buf, []byte(reason)...)
		return buf
	}

	// When we're the client, we need to mask the message as per
	// https://www.rfc-editor.org/rfc/rfc6455#section-5.3
	if isClient {
		key := newMaskKey()
		buf = append(buf, key[:]...)
		msgLength += len(key)
		buf = appendMessage(buf)
		maskBytes(key, 0, buf[2+len(key):])
	} else {
		buf = appendMessage(buf)
	}

	// simply best-effort, but return error for logging purposes
	// TODO: we might need to ensure we are the exclusive writer by this point (io.Copy is stopped)?
	_, err := conn.Write(buf[:msgLength])
	return err
}

// Copied from https://github.com/gorilla/websocket/blob/v1.5.0/mask.go
func maskBytes(key [4]byte, pos int, b []byte) int {
	// Mask one byte at a time for small buffers.
	if len(b) < 2*wordSize {
		for i := range b {
			b[i] ^= key[pos&3]
			pos++
		}
		return pos & 3
	}

	// Mask one byte at a time to word boundary.
	if n := int(uintptr(unsafe.Pointer(&b[0]))) % wordSize; n != 0 {
		n = wordSize - n
		for i := range b[:n] {
			b[i] ^= key[pos&3]
			pos++
		}
		b = b[n:]
	}

	// Create aligned word size key.
	var k [wordSize]byte
	for i := range k {
		k[i] = key[(pos+i)&3]
	}
	kw := *(*uintptr)(unsafe.Pointer(&k))

	// Mask one word at a time.
	n := (len(b) / wordSize) * wordSize
	for i := 0; i < n; i += wordSize {
		*(*uintptr)(unsafe.Pointer(uintptr(unsafe.Pointer(&b[0])) + uintptr(i))) ^= kw
	}

	// Mask one byte at a time for remaining bytes.
	b = b[n:]
	for i := range b {
		b[i] ^= key[pos&3]
		pos++
	}

	return pos & 3
}

// Copied from https://github.com/gorilla/websocket/blob/v1.5.0/conn.go#L184
func newMaskKey() [4]byte {
	n := weakrand.Uint32()
	return [4]byte{byte(n), byte(n >> 8), byte(n >> 16), byte(n >> 24)}
}

// isWebsocket returns true if r looks to be an upgrade request for WebSockets.
// It is a fairly naive check.
func isWebsocket(r *http.Request) bool {
	return httpguts.HeaderValuesContainsToken(r.Header["Connection"], "upgrade") &&
		httpguts.HeaderValuesContainsToken(r.Header["Upgrade"], "websocket")
}

// openConnection maps an open connection to
// an optional function for graceful close.
type openConnection struct {
	conn          io.ReadWriteCloser
	gracefulClose func() error
}

type maxLatencyWriter struct {
	dst     io.Writer
	flush   func() error
	latency time.Duration // non-zero; negative means to flush immediately

	mu           sync.Mutex // protects t, flushPending, and dst.Flush
	t            *time.Timer
	flushPending bool
	logger       *zap.Logger
}

func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
	m.mu.Lock()
	defer m.mu.Unlock()
	n, err = m.dst.Write(p)
	m.logger.Debug("wrote bytes", zap.Int("n", n), zap.Error(err))
	if m.latency < 0 {
		m.logger.Debug("flushing immediately")
		//nolint:errcheck
		m.flush()
		return
	}
	if m.flushPending {
		m.logger.Debug("delayed flush already pending")
		return
	}
	if m.t == nil {
		m.t = time.AfterFunc(m.latency, m.delayedFlush)
	} else {
		m.t.Reset(m.latency)
	}
	m.logger.Debug("timer set for delayed flush", zap.Duration("duration", m.latency))
	m.flushPending = true
	return
}

func (m *maxLatencyWriter) delayedFlush() {
	m.mu.Lock()
	defer m.mu.Unlock()
	if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
		m.logger.Debug("delayed flush is not pending")
		return
	}
	m.logger.Debug("delayed flush")
	//nolint:errcheck
	m.flush()
	m.flushPending = false
}

func (m *maxLatencyWriter) stop() {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.flushPending = false
	if m.t != nil {
		m.t.Stop()
	}
}

// switchProtocolCopier exists so goroutines proxying data back and
// forth have nice names in stacks.
type switchProtocolCopier struct {
	user, backend io.ReadWriteCloser
	wg            *sync.WaitGroup
}

func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {
	_, err := io.Copy(c.user, c.backend)
	errc <- err
	c.wg.Done()
}

func (c switchProtocolCopier) copyToBackend(errc chan<- error) {
	_, err := io.Copy(c.backend, c.user)
	errc <- err
	c.wg.Done()
}

var streamingBufPool = sync.Pool{
	New: func() any {
		// The Pool's New function should generally only return pointer
		// types, since a pointer can be put into the return interface
		// value without an allocation
		// - (from the package docs)
		b := make([]byte, defaultBufferSize)
		return &b
	},
}

const (
	defaultBufferSize = 32 * 1024
	wordSize          = int(unsafe.Sizeof(uintptr(0)))
)