diff options
author | Nassim Jahnke <[email protected]> | 2024-12-21 13:45:04 +0100 |
---|---|---|
committer | Nassim Jahnke <[email protected]> | 2024-12-21 13:45:04 +0100 |
commit | 3b0b3a0aef5863b37f252bfffef7ff5b9b266445 (patch) | |
tree | d6088aef454eff0d163847ebdcc5eb349fcac9bb /paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch | |
parent | 82216a59fef17863c86192c2754417480baa71c1 (diff) | |
download | Paper-3b0b3a0aef5863b37f252bfffef7ff5b9b266445.tar.gz Paper-3b0b3a0aef5863b37f252bfffef7ff5b9b266445.zip |
and some more
Diffstat (limited to 'paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch')
-rw-r--r-- | paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch | 391 |
1 files changed, 391 insertions, 0 deletions
diff --git a/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch b/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch new file mode 100644 index 0000000000..02589fdaae --- /dev/null +++ b/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch @@ -0,0 +1,391 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: Aikar <[email protected]> +Date: Wed, 6 May 2020 04:53:35 -0400 +Subject: [PATCH] Optimize Network Manager and add advanced packet support + +Adds ability for 1 packet to bundle other packets to follow it +Adds ability for a packet to delay sending more packets until a state is ready. + +Removes synchronization from sending packets +Removes processing packet queue off of main thread + - for the few cases where it is allowed, order is not necessary nor + should it even be happening concurrently in first place (handshaking/login/status) + +Ensures packets sent asynchronously are dispatched on main thread + +This helps ensure safety for ProtocolLib as packet listeners +are commonly accessing world state. This will allow you to schedule +a packet to be sent async, but itll be dispatched sync for packet +listeners to process. + +This should solve some deadlock risks + +Also adds Netty Channel Flush Consolidation to reduce the amount of flushing + +Also avoids spamming closed channel exception by rechecking closed state in dispatch +and then catch exceptions and close if they fire. + +Part of this commit was authored by: Spottedleaf, sandtechnology + +diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java +index 42f44c7cb0bd55ddfacd18acb0e596e7a953870e..ad8f8428b75e37097487cdfbd0db2421ee4cbe37 100644 +--- a/net/minecraft/network/Connection.java ++++ b/net/minecraft/network/Connection.java +@@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND; + private final PacketFlow receiving; + private volatile boolean sendLoginDisconnect = true; +- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue(); ++ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper - Optimize network + public Channel channel; + public SocketAddress address; + // Spigot start +@@ -145,6 +145,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + // Paper end - packet limiter + @Nullable public SocketAddress haProxyAddress; // Paper - Add API to get player's proxy address ++ // Paper start - Optimize network ++ public boolean isPending = true; ++ public boolean queueImmunity; ++ // Paper end - Optimize network + + public Connection(PacketFlow receiving) { + this.receiving = receiving; +@@ -423,11 +427,38 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + + public void send(Packet<?> packet, @Nullable PacketSendListener listener, boolean flush) { +- if (this.isConnected()) { +- this.flushQueue(); ++ // Paper start - Optimize network: Handle oversized packets better ++ final boolean connected = this.isConnected(); ++ if (!connected && !this.preparing) { ++ return; ++ } ++ ++ packet.onPacketDispatch(this.getPlayer()); ++ if (connected && (InnerUtil.canSendImmediate(this, packet) ++ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty() ++ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) { + this.sendPacket(packet, listener, flush); + } else { +- this.pendingActions.add(connection -> connection.sendPacket(packet, listener, flush)); ++ // Write the packets to the queue, then flush - antixray hooks there already ++ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet); ++ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); ++ if (!hasExtraPackets) { ++ this.pendingActions.add(new PacketSendAction(packet, listener, flush)); ++ } else { ++ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size()); ++ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets ++ ++ for (int i = 0, len = extraPackets.size(); i < len;) { ++ final Packet<?> extraPacket = extraPackets.get(i); ++ final boolean end = ++i == len; ++ actions.add(new PacketSendAction(extraPacket, end ? listener : null, end)); // Append listener to the end ++ } ++ ++ this.pendingActions.addAll(actions); ++ } ++ ++ this.flushQueue(); ++ // Paper end - Optimize network + } + } + +@@ -436,7 +467,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + this.flushQueue(); + action.accept(this); + } else { +- this.pendingActions.add(action); ++ this.pendingActions.add(new WrappedConsumer(action)); // Paper - Optimize network + } + } + +@@ -450,6 +481,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + + private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) { ++ // Paper start - Optimize network ++ final net.minecraft.server.level.ServerPlayer player = this.getPlayer(); ++ if (!this.isConnected()) { ++ packet.onPacketDispatchFinish(player, null); ++ return; ++ } ++ try { ++ // Paper end - Optimize network + ChannelFuture channelFuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet); + if (sendListener != null) { + channelFuture.addListener(future -> { +@@ -465,14 +504,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + }); + } + ++ // Paper start - Optimize network ++ if (packet.hasFinishListener()) { ++ channelFuture.addListener((ChannelFutureListener) future -> packet.onPacketDispatchFinish(player, future)); ++ } + channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); ++ } catch (final Exception e) { ++ LOGGER.error("NetworkException: {}", player, e); ++ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage())); ++ packet.onPacketDispatchFinish(player, null); ++ } ++ // Paper end - Optimize network + } + + public void flushChannel() { + if (this.isConnected()) { + this.flush(); + } else { +- this.pendingActions.add(Connection::flush); ++ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network + } + } + +@@ -484,16 +533,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + } + +- private void flushQueue() { +- if (this.channel != null && this.channel.isOpen()) { ++ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread ++ private boolean flushQueue() { ++ if (!this.isConnected()) { ++ return true; ++ } ++ if (io.papermc.paper.util.MCUtil.isMainThread()) { ++ return this.processQueue(); ++ } else if (this.isPending) { ++ // Should only happen during login/status stages + synchronized (this.pendingActions) { +- Consumer<Connection> consumer; +- while ((consumer = this.pendingActions.poll()) != null) { +- consumer.accept(this); ++ return this.processQueue(); ++ } ++ } ++ return false; ++ } ++ ++ private boolean processQueue() { ++ if (this.pendingActions.isEmpty()) { ++ return true; ++ } ++ ++ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore ++ // But if we are not on main due to login/status, the parent is synchronized on packetQueue ++ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator(); ++ while (iterator.hasNext()) { ++ final WrappedConsumer queued = iterator.next(); // poll -> peek ++ ++ // Fix NPE (Spigot bug caused by handleDisconnection()) ++ if (queued == null) { ++ return true; ++ } ++ ++ if (queued.isConsumed()) { ++ continue; ++ } ++ ++ if (queued instanceof PacketSendAction packetSendAction) { ++ final Packet<?> packet = packetSendAction.packet; ++ if (!packet.isReady()) { ++ return false; + } + } ++ ++ iterator.remove(); ++ if (queued.tryMarkConsumed()) { ++ queued.accept(this); ++ } + } ++ return true; + } ++ // Paper end - Optimize network + + private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world + private static int joinAttemptsThisTick; // Paper - Buffer joins to world +@@ -563,6 +653,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + + public void disconnect(DisconnectionDetails disconnectionDetails) { + this.preparing = false; // Spigot ++ this.clearPacketQueue(); // Paper - Optimize network + if (this.channel == null) { + this.delayedDisconnect = disconnectionDetails; + } +@@ -751,7 +842,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public void handleDisconnection() { + if (this.channel != null && !this.channel.isOpen()) { + if (this.disconnectionHandled) { +- LOGGER.warn("handleDisconnection() called twice"); ++ // LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message + } else { + this.disconnectionHandled = true; + PacketListener packetListener = this.getPacketListener(); +@@ -762,7 +853,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + ); + packetListener1.onDisconnect(disconnectionDetails); + } +- this.pendingActions.clear(); // Free up packet queue. ++ this.clearPacketQueue(); // Paper - Optimize network + // Paper start - Add PlayerConnectionCloseEvent + if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) { + /* Player was logged in, either game listener or configuration listener */ +@@ -797,4 +888,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public void setBandwidthLogger(LocalSampleLogger bandwithLogger) { + this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger); + } ++ ++ // Paper start - Optimize network ++ public void clearPacketQueue() { ++ final net.minecraft.server.level.ServerPlayer player = getPlayer(); ++ for (final Consumer<Connection> queuedAction : this.pendingActions) { ++ if (queuedAction instanceof PacketSendAction packetSendAction) { ++ final Packet<?> packet = packetSendAction.packet; ++ if (packet.hasFinishListener()) { ++ packet.onPacketDispatchFinish(player, null); ++ } ++ } ++ } ++ this.pendingActions.clear(); ++ } ++ ++ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up. ++ ++ @Nullable ++ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) { ++ final java.util.List<Packet<?>> extra = packet.getExtraPackets(); ++ if (extra == null || extra.isEmpty()) { ++ return null; ++ } ++ ++ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size()); ++ buildExtraPackets0(extra, ret); ++ return ret; ++ } ++ ++ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) { ++ for (final Packet<?> extra : extraPackets) { ++ into.add(extra); ++ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets(); ++ if (extraExtra != null && !extraExtra.isEmpty()) { ++ buildExtraPackets0(extraExtra, into); ++ } ++ } ++ } ++ ++ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) { ++ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY || ++ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket; ++ } ++ } ++ ++ private static class WrappedConsumer implements Consumer<Connection> { ++ private final Consumer<Connection> delegate; ++ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false); ++ ++ private WrappedConsumer(final Consumer<Connection> delegate) { ++ this.delegate = delegate; ++ } ++ ++ @Override ++ public void accept(final Connection connection) { ++ this.delegate.accept(connection); ++ } ++ ++ public boolean tryMarkConsumed() { ++ return consumed.compareAndSet(false, true); ++ } ++ ++ public boolean isConsumed() { ++ return consumed.get(); ++ } ++ } ++ ++ private static final class PacketSendAction extends WrappedConsumer { ++ private final Packet<?> packet; ++ ++ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) { ++ super(connection -> connection.sendPacket(packet, packetSendListener, flush)); ++ this.packet = packet; ++ } ++ } ++ // Paper end - Optimize network + } +diff --git a/net/minecraft/network/protocol/Packet.java b/net/minecraft/network/protocol/Packet.java +index 65ff8b9112ec76eeac48c679044fc02ae7d4ffeb..e4789584cbe43959681a8522c66eab58369bebd0 100644 +--- a/net/minecraft/network/protocol/Packet.java ++++ b/net/minecraft/network/protocol/Packet.java +@@ -35,4 +35,32 @@ public interface Packet<T extends PacketListener> { + static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) { + return StreamCodec.ofMember(encoder, decoder); + } ++ ++ // Paper start ++ /** ++ * @param player Null if not at PLAY stage yet ++ */ ++ default void onPacketDispatch(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player) { ++ } ++ ++ /** ++ * @param player Null if not at PLAY stage yet ++ * @param future Can be null if packet was cancelled ++ */ ++ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) { ++ } ++ ++ default boolean hasFinishListener() { ++ return false; ++ } ++ ++ default boolean isReady() { ++ return true; ++ } ++ ++ @org.jetbrains.annotations.Nullable ++ default java.util.List<Packet<?>> getExtraPackets() { ++ return null; ++ } ++ // Paper end + } +diff --git a/net/minecraft/server/network/ServerConnectionListener.java b/net/minecraft/server/network/ServerConnectionListener.java +index b873af7183d9b1aabc87e63d254a4326f17b21c9..7de11ba404f0b60e7f7b7c16954811a343688219 100644 +--- a/net/minecraft/server/network/ServerConnectionListener.java ++++ b/net/minecraft/server/network/ServerConnectionListener.java +@@ -66,11 +66,13 @@ public class ServerConnectionListener { + + // Paper start - prevent blocking on adding a new connection while the server is ticking + private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); ++ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network + + private final void addPending() { + Connection connection; + while ((connection = this.pending.poll()) != null) { + this.connections.add(connection); ++ connection.isPending = false; // Paper - Optimize network + } + } + // Paper end - prevent blocking on adding a new connection while the server is ticking +@@ -120,6 +122,7 @@ public class ServerConnectionListener { + } catch (ChannelException var5) { + } + ++ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network + ChannelPipeline channelPipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)); + if (ServerConnectionListener.this.server.repliesToStatus()) { + channelPipeline.addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer())); |