diff options
Diffstat (limited to 'patches/server/0995-Optimize-Network-Manager-and-add-advanced-packet-sup.patch')
-rw-r--r-- | patches/server/0995-Optimize-Network-Manager-and-add-advanced-packet-sup.patch | 395 |
1 files changed, 395 insertions, 0 deletions
diff --git a/patches/server/0995-Optimize-Network-Manager-and-add-advanced-packet-sup.patch b/patches/server/0995-Optimize-Network-Manager-and-add-advanced-packet-sup.patch new file mode 100644 index 0000000000..f51a3d8642 --- /dev/null +++ b/patches/server/0995-Optimize-Network-Manager-and-add-advanced-packet-sup.patch @@ -0,0 +1,395 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: Aikar <[email protected]> +Date: Wed, 6 May 2020 04:53:35 -0400 +Subject: [PATCH] Optimize Network Manager and add advanced packet support + +Adds ability for 1 packet to bundle other packets to follow it +Adds ability for a packet to delay sending more packets until a state is ready. + +Removes synchronization from sending packets +Removes processing packet queue off of main thread + - for the few cases where it is allowed, order is not necessary nor + should it even be happening concurrently in first place (handshaking/login/status) + +Ensures packets sent asynchronously are dispatched on main thread + +This helps ensure safety for ProtocolLib as packet listeners +are commonly accessing world state. This will allow you to schedule +a packet to be sent async, but itll be dispatched sync for packet +listeners to process. + +This should solve some deadlock risks + +Also adds Netty Channel Flush Consolidation to reduce the amount of flushing + +Also avoids spamming closed channel exception by rechecking closed state in dispatch +and then catch exceptions and close if they fire. + +Part of this commit was authored by: Spottedleaf, sandtechnology + +diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java +index 55848fa832d0f4d2d03f99df51e10c5fdfcd2ded..7dc7aeb1d94d26cf54bd4e4ab13972a3a60c1f98 100644 +--- a/src/main/java/net/minecraft/network/Connection.java ++++ b/src/main/java/net/minecraft/network/Connection.java +@@ -93,7 +93,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND; + private final PacketFlow receiving; + private volatile boolean sendLoginDisconnect = true; +- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue(); ++ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper + public Channel channel; + public SocketAddress address; + // Spigot Start +@@ -125,6 +125,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public java.net.InetSocketAddress virtualHost; + private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); // Paper - Disable explicit network manager flushing + // Paper end ++ // Paper start - Optimize network ++ public boolean isPending = true; ++ public boolean queueImmunity; ++ // Paper end - Optimize network + + // Paper start - add utility methods + public final net.minecraft.server.level.ServerPlayer getPlayer() { +@@ -440,15 +444,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + + public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) { +- if (this.isConnected()) { +- this.flushQueue(); ++ // Paper start - Optimize network: Handle oversized packets better ++ final boolean connected = this.isConnected(); ++ if (!connected && !this.preparing) { ++ return; ++ } ++ ++ packet.onPacketDispatch(this.getPlayer()); ++ if (connected && (InnerUtil.canSendImmediate(this, packet) ++ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty() ++ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) { + this.sendPacket(packet, callbacks, flush); + } else { +- this.pendingActions.add((networkmanager) -> { +- networkmanager.sendPacket(packet, callbacks, flush); +- }); +- } ++ // Write the packets to the queue, then flush - antixray hooks there already ++ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet); ++ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); ++ if (!hasExtraPackets) { ++ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush)); ++ } else { ++ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size()); ++ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets ++ ++ for (int i = 0, len = extraPackets.size(); i < len;) { ++ final Packet<?> extraPacket = extraPackets.get(i); ++ final boolean end = ++i == len; ++ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end ++ } ++ ++ this.pendingActions.addAll(actions); ++ } + ++ this.flushQueue(); ++ // Paper end - Optimize network ++ } + } + + public void runOnceConnected(Consumer<Connection> task) { +@@ -456,7 +484,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + this.flushQueue(); + task.accept(this); + } else { +- this.pendingActions.add(task); ++ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network + } + + } +@@ -474,6 +502,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + + private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) { ++ // Paper start - Optimize network ++ final net.minecraft.server.level.ServerPlayer player = this.getPlayer(); ++ if (!this.isConnected()) { ++ packet.onPacketDispatchFinish(player, null); ++ return; ++ } ++ try { ++ // Paper end - Optimize network + ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet); + + if (callbacks != null) { +@@ -493,14 +529,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + }); + } + ++ // Paper start - Optimize network ++ if (packet.hasFinishListener()) { ++ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); ++ } + channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); ++ } catch (final Exception e) { ++ LOGGER.error("NetworkException: {}", player, e); ++ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage())); ++ packet.onPacketDispatchFinish(player, null); ++ } ++ // Paper end - Optimize network + } + + public void flushChannel() { + if (this.isConnected()) { + this.flush(); + } else { +- this.pendingActions.add(Connection::flush); ++ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network + } + + } +@@ -516,20 +562,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + + } + +- private void flushQueue() { +- if (this.channel != null && this.channel.isOpen()) { +- Queue queue = this.pendingActions; +- ++ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread ++ private boolean flushQueue() { ++ if (!this.isConnected()) { ++ return true; ++ } ++ if (io.papermc.paper.util.MCUtil.isMainThread()) { ++ return this.processQueue(); ++ } else if (this.isPending) { ++ // Should only happen during login/status stages + synchronized (this.pendingActions) { +- Consumer consumer; ++ return this.processQueue(); ++ } ++ } ++ return false; ++ } ++ ++ private boolean processQueue() { ++ if (this.pendingActions.isEmpty()) { ++ return true; ++ } + +- while ((consumer = (Consumer) this.pendingActions.poll()) != null) { +- consumer.accept(this); ++ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore ++ // But if we are not on main due to login/status, the parent is synchronized on packetQueue ++ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator(); ++ while (iterator.hasNext()) { ++ final WrappedConsumer queued = iterator.next(); // poll -> peek ++ ++ // Fix NPE (Spigot bug caused by handleDisconnection()) ++ if (queued == null) { ++ return true; ++ } ++ ++ if (queued.isConsumed()) { ++ continue; ++ } ++ ++ if (queued instanceof PacketSendAction packetSendAction) { ++ final Packet<?> packet = packetSendAction.packet; ++ if (!packet.isReady()) { ++ return false; + } ++ } + ++ iterator.remove(); ++ if (queued.tryMarkConsumed()) { ++ queued.accept(this); + } + } ++ return true; + } ++ // Paper end - Optimize network + + private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world + private static int joinAttemptsThisTick; // Paper - Buffer joins to world +@@ -593,6 +676,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public void disconnect(DisconnectionDetails disconnectionInfo) { + // Spigot Start + this.preparing = false; ++ this.clearPacketQueue(); // Paper - Optimize network + // Spigot End + if (this.channel == null) { + this.delayedDisconnect = disconnectionInfo; +@@ -780,7 +864,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public void handleDisconnection() { + if (this.channel != null && !this.channel.isOpen()) { + if (this.disconnectionHandled) { +- Connection.LOGGER.warn("handleDisconnection() called twice"); ++ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message + } else { + this.disconnectionHandled = true; + PacketListener packetlistener = this.getPacketListener(); +@@ -793,7 +877,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + + packetlistener1.onDisconnect(disconnectiondetails); + } +- this.pendingActions.clear(); // Free up packet queue. ++ this.clearPacketQueue(); // Paper - Optimize network + // Paper start - Add PlayerConnectionCloseEvent + final PacketListener packetListener = this.getPacketListener(); + if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) { +@@ -830,4 +914,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public void setBandwidthLogger(LocalSampleLogger log) { + this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log); + } ++ ++ // Paper start - Optimize network ++ public void clearPacketQueue() { ++ final net.minecraft.server.level.ServerPlayer player = getPlayer(); ++ for (final Consumer<Connection> queuedAction : this.pendingActions) { ++ if (queuedAction instanceof PacketSendAction packetSendAction) { ++ final Packet<?> packet = packetSendAction.packet; ++ if (packet.hasFinishListener()) { ++ packet.onPacketDispatchFinish(player, null); ++ } ++ } ++ } ++ this.pendingActions.clear(); ++ } ++ ++ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up. ++ ++ @Nullable ++ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) { ++ final java.util.List<Packet<?>> extra = packet.getExtraPackets(); ++ if (extra == null || extra.isEmpty()) { ++ return null; ++ } ++ ++ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size()); ++ buildExtraPackets0(extra, ret); ++ return ret; ++ } ++ ++ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) { ++ for (final Packet<?> extra : extraPackets) { ++ into.add(extra); ++ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets(); ++ if (extraExtra != null && !extraExtra.isEmpty()) { ++ buildExtraPackets0(extraExtra, into); ++ } ++ } ++ } ++ ++ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) { ++ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY || ++ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket; ++ } ++ } ++ ++ private static class WrappedConsumer implements Consumer<Connection> { ++ private final Consumer<Connection> delegate; ++ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false); ++ ++ private WrappedConsumer(final Consumer<Connection> delegate) { ++ this.delegate = delegate; ++ } ++ ++ @Override ++ public void accept(final Connection connection) { ++ this.delegate.accept(connection); ++ } ++ ++ public boolean tryMarkConsumed() { ++ return consumed.compareAndSet(false, true); ++ } ++ ++ public boolean isConsumed() { ++ return consumed.get(); ++ } ++ } ++ ++ private static final class PacketSendAction extends WrappedConsumer { ++ private final Packet<?> packet; ++ ++ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) { ++ super(connection -> connection.sendPacket(packet, packetSendListener, flush)); ++ this.packet = packet; ++ } ++ } ++ // Paper end - Optimize network + } +diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java +index 82fc12ffbd1585b4a8d09a025914830af77b0f8d..c9d283b7fc9ede79dc6cbc39dfc9e7ae986a6a47 100644 +--- a/src/main/java/net/minecraft/network/protocol/Packet.java ++++ b/src/main/java/net/minecraft/network/protocol/Packet.java +@@ -35,4 +35,31 @@ public interface Packet<T extends PacketListener> { + static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) { + return StreamCodec.ofMember(encoder, decoder); + } ++ ++ // Paper start ++ /** ++ * @param player Null if not at PLAY stage yet ++ */ ++ default void onPacketDispatch(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player) { ++ } ++ ++ /** ++ * @param player Null if not at PLAY stage yet ++ * @param future Can be null if packet was cancelled ++ */ ++ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {} ++ ++ default boolean hasFinishListener() { ++ return false; ++ } ++ ++ default boolean isReady() { ++ return true; ++ } ++ ++ @org.jetbrains.annotations.Nullable ++ default java.util.List<Packet<?>> getExtraPackets() { ++ return null; ++ } ++ // Paper end + } +diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java +index 1f696644b958538e9f5d568a2e4bba69d74a191e..2929d9a2efa9669781b6773161db7c5f968c2544 100644 +--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java ++++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java +@@ -63,10 +63,12 @@ public class ServerConnectionListener { + final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList()); + // Paper start - prevent blocking on adding a new connection while the server is ticking + private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); ++ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network + private final void addPending() { + Connection connection; + while ((connection = pending.poll()) != null) { + connections.add(connection); ++ connection.isPending = false; // Paper - Optimize network + } + } + // Paper end - prevent blocking on adding a new connection while the server is ticking +@@ -112,6 +114,7 @@ public class ServerConnectionListener { + ; + } + ++ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network + ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)); + + if (ServerConnectionListener.this.server.repliesToStatus()) { |