diff options
author | Matt Holt <[email protected]> | 2023-10-16 22:17:32 -0600 |
---|---|---|
committer | GitHub <[email protected]> | 2023-10-16 22:17:32 -0600 |
commit | 174c19a9539443e6651eecf23ab86c7fd5d8c293 (patch) | |
tree | cc5a85ce731ef916a3502f8c7679570793574f29 | |
parent | c8559c448537969376623be9d352949b59907b0e (diff) | |
download | caddy-174c19a9539443e6651eecf23ab86c7fd5d8c293.tar.gz caddy-174c19a9539443e6651eecf23ab86c7fd5d8c293.zip |
core: Apply SO_REUSEPORT to UDP sockets (#5725)
* core: Apply SO_REUSEPORT to UDP sockets
For some reason, 10 months ago when I implemented SO_REUSEPORT
for TCP, I didn't realize, or forgot, that it can be used for UDP too. It is a
much better solution than using deadline hacks to reuse a socket, at
least for TCP.
Then https://github.com/mholt/caddy-l4/issues/132 was posted,
in which we see that UDP servers never actually stopped when the
L4 app was stopped. I verified this using this command:
$ nc -u 127.0.0.1 55353
combined with POSTing configs to the /load admin endpoint (which
alternated between an echo server and a proxy server so I could tell
which config was being used).
I refactored the code to use SO_REUSEPORT for UDP, but of course
we still need graceful reloads on all platforms, not just Unix, so I
also implemented a deadline hack similar to what we used for
TCP before. That implementation for TCP was not perfect, possibly
having a logical (not data) race condition; but for UDP so far it
seems to be working. Verified the same way I verified that SO_REUSEPORT
works.
I think this code is slightly cleaner and I'm fairly confident this code
is effective.
* Check error
* Fix return
* Fix var name
* implement Unwrap interface and clean up
* move unix packet conn to platform specific file
* implement Unwrap for unix packet conn
* Move sharedPacketConn into proper file
* Fix Windows
* move sharedPacketConn and fakeClosePacketConn to proper file
---------
Co-authored-by: Weidi Deng <[email protected]>
-rw-r--r-- | listen.go | 106 | ||||
-rw-r--r-- | listen_unix.go | 75 | ||||
-rw-r--r-- | listeners.go | 140 |
3 files changed, 181 insertions, 140 deletions
@@ -30,18 +30,34 @@ func reuseUnixSocket(network, addr string) (any, error) { return nil, nil } -func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (net.Listener, error) { - sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { - ln, err := config.Listen(ctx, network, address) +func listenReusable(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (any, error) { + switch network { + case "udp", "udp4", "udp6", "unixgram": + sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { + pc, err := config.ListenPacket(ctx, network, address) + if err != nil { + return nil, err + } + return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil + }) if err != nil { return nil, err } - return &sharedListener{Listener: ln, key: lnKey}, nil - }) - if err != nil { - return nil, err + return &fakeClosePacketConn{sharedPacketConn: sharedPc.(*sharedPacketConn)}, nil + + default: + sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { + ln, err := config.Listen(ctx, network, address) + if err != nil { + return nil, err + } + return &sharedListener{Listener: ln, key: lnKey}, nil + }) + if err != nil { + return nil, err + } + return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: config.KeepAlive}, nil } - return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: config.KeepAlive}, nil } // fakeCloseListener is a private wrapper over a listener that @@ -98,7 +114,7 @@ func (fcl *fakeCloseListener) Accept() (net.Conn, error) { // so that it's clear in the code that side-effects are shared with other // users of this listener, not just our own reference to it; we also don't // do anything with the error because all we could do is log it, but we - // expliclty assign it to nothing so we don't forget it's there if needed + // explicitly assign it to nothing so we don't forget it's there if needed _ = fcl.sharedListener.clearDeadline() if netErr, ok := err.(net.Error); ok && netErr.Timeout() { @@ -172,3 +188,75 @@ func (sl *sharedListener) setDeadline() error { func (sl *sharedListener) Destruct() error { return sl.Listener.Close() } + +// fakeClosePacketConn is like fakeCloseListener, but for PacketConns, +// or more specifically, *net.UDPConn +type fakeClosePacketConn struct { + closed int32 // accessed atomically; belongs to this struct only + *sharedPacketConn // embedded, so we also become a net.PacketConn; its key is used in Close +} + +func (fcpc *fakeClosePacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + // if the listener is already "closed", return error + if atomic.LoadInt32(&fcpc.closed) == 1 { + return 0, nil, &net.OpError{ + Op: "readfrom", + Net: fcpc.LocalAddr().Network(), + Addr: fcpc.LocalAddr(), + Err: errFakeClosed, + } + } + + // call underlying readfrom + n, addr, err = fcpc.sharedPacketConn.ReadFrom(p) + if err != nil { + // this server was stopped, so clear the deadline and let + // any new server continue reading; but we will exit + if atomic.LoadInt32(&fcpc.closed) == 1 { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + if err = fcpc.SetReadDeadline(time.Time{}); err != nil { + return + } + } + } + return + } + + return +} + +// Close won't close the underlying socket unless there is no more reference, then listenerPool will close it. +func (fcpc *fakeClosePacketConn) Close() error { + if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) { + _ = fcpc.SetReadDeadline(time.Now()) // unblock ReadFrom() calls to kick old servers out of their loops + _, _ = listenerPool.Delete(fcpc.sharedPacketConn.key) + } + return nil +} + +func (fcpc *fakeClosePacketConn) Unwrap() net.PacketConn { + return fcpc.sharedPacketConn.PacketConn +} + +// sharedPacketConn is like sharedListener, but for net.PacketConns. +type sharedPacketConn struct { + net.PacketConn + key string +} + +// Destruct closes the underlying socket. +func (spc *sharedPacketConn) Destruct() error { + return spc.PacketConn.Close() +} + +// Unwrap returns the underlying socket +func (spc *sharedPacketConn) Unwrap() net.PacketConn { + return spc.PacketConn +} + +// Interface guards (see https://github.com/caddyserver/caddy/issues/3998) +var ( + _ (interface { + Unwrap() net.PacketConn + }) = (*fakeClosePacketConn)(nil) +) diff --git a/listen_unix.go b/listen_unix.go index 8870da5e9..34cd76c5c 100644 --- a/listen_unix.go +++ b/listen_unix.go @@ -22,8 +22,10 @@ package caddy import ( "context" "errors" + "io" "io/fs" "net" + "os" "sync/atomic" "syscall" @@ -87,7 +89,7 @@ func reuseUnixSocket(network, addr string) (any, error) { return nil, nil } -func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (net.Listener, error) { +func listenReusable(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (any, error) { // wrap any Control function set by the user so we can also add our reusePort control without clobbering theirs oldControl := config.Control config.Control = func(network, address string, c syscall.RawConn) error { @@ -103,7 +105,14 @@ func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, // we still put it in the listenerPool so we can count how many // configs are using this socket; necessary to ensure we can know // whether to enforce shutdown delays, for example (see #5393). - ln, err := config.Listen(ctx, network, address) + var ln io.Closer + var err error + switch network { + case "udp", "udp4", "udp6", "unixgram": + ln, err = config.ListenPacket(ctx, network, address) + default: + ln, err = config.Listen(ctx, network, address) + } if err == nil { listenerPool.LoadOrStore(lnKey, nil) } @@ -117,9 +126,23 @@ func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, unixSockets[lnKey] = ln.(*unixListener) } + // TODO: Not 100% sure this is necessary, but we do this for net.UnixListener in listen_unix.go, so... + if unix, ok := ln.(*net.UnixConn); ok { + ln = &unixConn{unix, address, lnKey, &one} + unixSockets[lnKey] = ln.(*unixConn) + } + // lightly wrap the listener so that when it is closed, // we can decrement the usage pool counter - return deleteListener{ln, lnKey}, err + switch specificLn := ln.(type) { + case net.Listener: + return deleteListener{specificLn, lnKey}, err + case net.PacketConn: + return deletePacketConn{specificLn, lnKey}, err + } + + // other types, I guess we just return them directly + return ln, err } // reusePort sets SO_REUSEPORT. Ineffective for unix sockets. @@ -158,6 +181,36 @@ func (uln *unixListener) Close() error { return uln.UnixListener.Close() } +type unixConn struct { + *net.UnixConn + filename string + mapKey string + count *int32 // accessed atomically +} + +func (uc *unixConn) Close() error { + newCount := atomic.AddInt32(uc.count, -1) + if newCount == 0 { + defer func() { + unixSocketsMu.Lock() + delete(unixSockets, uc.mapKey) + unixSocketsMu.Unlock() + _ = syscall.Unlink(uc.filename) + }() + } + return uc.UnixConn.Close() +} + +func (uc *unixConn) Unwrap() net.PacketConn { + return uc.UnixConn +} + +// unixSockets keeps track of the currently-active unix sockets +// so we can transfer their FDs gracefully during reloads. +var unixSockets = make(map[string]interface { + File() (*os.File, error) +}) + // deleteListener is a type that simply deletes itself // from the listenerPool when it closes. It is used // solely for the purpose of reference counting (i.e. @@ -171,3 +224,19 @@ func (dl deleteListener) Close() error { _, _ = listenerPool.Delete(dl.lnKey) return dl.Listener.Close() } + +// deletePacketConn is like deleteListener, but +// for net.PacketConns. +type deletePacketConn struct { + net.PacketConn + lnKey string +} + +func (dl deletePacketConn) Close() error { + _, _ = listenerPool.Delete(dl.lnKey) + return dl.PacketConn.Close() +} + +func (dl deletePacketConn) Unwrap() net.PacketConn { + return dl.PacketConn +} diff --git a/listeners.go b/listeners.go index 67c519670..84a32e45a 100644 --- a/listeners.go +++ b/listeners.go @@ -28,7 +28,6 @@ import ( "strings" "sync" "sync/atomic" - "syscall" "time" "github.com/quic-go/quic-go" @@ -149,11 +148,13 @@ func (na NetworkAddress) Listen(ctx context.Context, portOffset uint, config net } func (na NetworkAddress) listen(ctx context.Context, portOffset uint, config net.ListenConfig) (any, error) { - var ln any - var err error - var address string - var unixFileMode fs.FileMode - var isAbtractUnixSocket bool + var ( + ln any + err error + address string + unixFileMode fs.FileMode + isAbtractUnixSocket bool + ) // split unix socket addr early so lnKey // is independent of permissions bits @@ -181,27 +182,10 @@ func (na NetworkAddress) listen(ctx context.Context, portOffset uint, config net lnKey := listenerKey(na.Network, address) - switch na.Network { - case "tcp", "tcp4", "tcp6", "unix", "unixpacket": - ln, err = listenTCPOrUnix(ctx, lnKey, na.Network, address, config) - case "unixgram": - ln, err = config.ListenPacket(ctx, na.Network, address) - case "udp", "udp4", "udp6": - sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { - pc, err := config.ListenPacket(ctx, na.Network, address) - if err != nil { - return nil, err - } - return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil - }) - if err != nil { - return nil, err - } - spc := sharedPc.(*sharedPacketConn) - ln = &fakeClosePacketConn{spc: spc, UDPConn: spc.PacketConn.(*net.UDPConn)} - } if strings.HasPrefix(na.Network, "ip") { ln, err = config.ListenPacket(ctx, na.Network, address) + } else { + ln, err = listenReusable(ctx, lnKey, na.Network, address, config) } if err != nil { return nil, err @@ -210,13 +194,6 @@ func (na NetworkAddress) listen(ctx context.Context, portOffset uint, config net return nil, fmt.Errorf("unsupported network type: %s", na.Network) } - // TODO: Not 100% sure this is necessary, but we do this for net.UnixListener in listen_unix.go, so... - if unix, ok := ln.(*net.UnixConn); ok { - one := int32(1) - ln = &unixConn{unix, address, lnKey, &one} - unixSockets[lnKey] = unix - } - if IsUnixNetwork(na.Network) { if !isAbtractUnixSocket { if err := os.Chmod(address, unixFileMode); err != nil { @@ -555,20 +532,8 @@ func ListenQUIC(ln net.PacketConn, tlsConf *tls.Config, activeRequests *int64) ( // and the request counter will reflect current http server ctx, cancel := sql.sqs.addState(tlsConf, activeRequests) - // TODO: to serve QUIC over a unix socket, currently we need to hold onto - // the underlying net.PacketConn (which we wrap as unixConn to keep count - // of closes) because closing the quic.EarlyListener doesn't actually close - // the underlying PacketConn, but we need to for unix sockets since we dup - // the file descriptor and thus need to close the original; track issue: - // https://github.com/quic-go/quic-go/issues/3560#issuecomment-1258959608 - var unix *unixConn - if uc, ok := ln.(*unixConn); ok { - unix = uc - } - return &fakeCloseQuicListener{ sharedQuicListener: sql, - uc: unix, context: ctx, contextCancel: cancel, }, nil @@ -677,17 +642,6 @@ func (sql *sharedQuicListener) Destruct() error { return sql.packetConn.Close() } -// sharedPacketConn is like sharedListener, but for net.PacketConns. -type sharedPacketConn struct { - net.PacketConn - key string -} - -// Destruct closes the underlying socket. -func (spc *sharedPacketConn) Destruct() error { - return spc.PacketConn.Close() -} - // fakeClosedErr returns an error value that is not temporary // nor a timeout, suitable for making the caller think the // listener is actually closed @@ -707,39 +661,9 @@ func fakeClosedErr(l interface{ Addr() net.Addr }) error { // socket is actually left open. var errFakeClosed = fmt.Errorf("listener 'closed' 😉") -// fakeClosePacketConn is like fakeCloseListener, but for PacketConns, -// or more specifically, *net.UDPConn -type fakeClosePacketConn struct { - closed int32 // accessed atomically; belongs to this struct only - spc *sharedPacketConn // its key is used in Close - *net.UDPConn // embedded, so we also become a net.PacketConn and enable several other optimizations done by quic-go -} - -// interface guard for extra optimizations -// needed by QUIC implementation: https://github.com/caddyserver/caddy/issues/3998, https://github.com/caddyserver/caddy/issues/5605 -var _ quic.OOBCapablePacketConn = (*fakeClosePacketConn)(nil) - -// https://pkg.go.dev/golang.org/x/net/ipv4#NewPacketConn is used by quic-go and requires a net.PacketConn type assertable to a net.Conn, -// but doesn't actually use these methods, the only methods needed are `ReadMsgUDP` and `SyscallConn`. -var _ net.Conn = (*fakeClosePacketConn)(nil) - -// Unwrap returns the underlying net.UDPConn for quic-go optimization -func (fcpc *fakeClosePacketConn) Unwrap() any { - return fcpc.UDPConn -} - -// Close won't close the underlying socket unless there is no more reference, then listenerPool will close it. -func (fcpc *fakeClosePacketConn) Close() error { - if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) { - _, _ = listenerPool.Delete(fcpc.spc.key) - } - return nil -} - type fakeCloseQuicListener struct { - closed int32 // accessed atomically; belongs to this struct only - *sharedQuicListener // embedded, so we also become a quic.EarlyListener - uc *unixConn // underlying unix socket, if UDS + closed int32 // accessed atomically; belongs to this struct only + *sharedQuicListener // embedded, so we also become a quic.EarlyListener context context.Context contextCancel context.CancelFunc } @@ -766,11 +690,6 @@ func (fcql *fakeCloseQuicListener) Close() error { if atomic.CompareAndSwapInt32(&fcql.closed, 0, 1) { fcql.contextCancel() _, _ = listenerPool.Delete(fcql.sharedQuicListener.key) - if fcql.uc != nil { - // unix sockets need to be closed ourselves because we dup() the file - // descriptor when we reuse them, so this avoids a resource leak - fcql.uc.Close() - } } return nil } @@ -796,34 +715,7 @@ func RegisterNetwork(network string, getListener ListenerFunc) { networkTypes[network] = getListener } -type unixConn struct { - *net.UnixConn - filename string - mapKey string - count *int32 // accessed atomically -} - -func (uc *unixConn) Close() error { - newCount := atomic.AddInt32(uc.count, -1) - if newCount == 0 { - defer func() { - unixSocketsMu.Lock() - delete(unixSockets, uc.mapKey) - unixSocketsMu.Unlock() - _ = syscall.Unlink(uc.filename) - }() - } - return uc.UnixConn.Close() -} - -// unixSockets keeps track of the currently-active unix sockets -// so we can transfer their FDs gracefully during reloads. -var ( - unixSockets = make(map[string]interface { - File() (*os.File, error) - }) - unixSocketsMu sync.Mutex -) +var unixSocketsMu sync.Mutex // getListenerFromPlugin returns a listener on the given network and address // if a plugin has registered the network name. It may return (nil, nil) if @@ -867,11 +759,3 @@ type ListenerWrapper interface { var listenerPool = NewUsagePool() const maxPortSpan = 65535 - -// Interface guards (see https://github.com/caddyserver/caddy/issues/3998) -var ( - _ (interface{ SetReadBuffer(int) error }) = (*fakeClosePacketConn)(nil) - _ (interface { - SyscallConn() (syscall.RawConn, error) - }) = (*fakeClosePacketConn)(nil) -) |