diff options
Diffstat (limited to 'patches/server/0302-Optimize-Network-Manager-and-add-advanced-packet-sup.patch')
-rw-r--r-- | patches/server/0302-Optimize-Network-Manager-and-add-advanced-packet-sup.patch | 335 |
1 files changed, 335 insertions, 0 deletions
diff --git a/patches/server/0302-Optimize-Network-Manager-and-add-advanced-packet-sup.patch b/patches/server/0302-Optimize-Network-Manager-and-add-advanced-packet-sup.patch new file mode 100644 index 0000000000..0d594b9746 --- /dev/null +++ b/patches/server/0302-Optimize-Network-Manager-and-add-advanced-packet-sup.patch @@ -0,0 +1,335 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: Aikar <[email protected]> +Date: Wed, 6 May 2020 04:53:35 -0400 +Subject: [PATCH] Optimize Network Manager and add advanced packet support + +Adds ability for 1 packet to bundle other packets to follow it +Adds ability for a packet to delay sending more packets until a state is ready. + +Removes synchronization from sending packets +Removes processing packet queue off of main thread + - for the few cases where it is allowed, order is not necessary nor + should it even be happening concurrently in first place (handshaking/login/status) + +Ensures packets sent asynchronously are dispatched on main thread + +This helps ensure safety for ProtocolLib as packet listeners +are commonly accessing world state. This will allow you to schedule +a packet to be sent async, but itll be dispatched sync for packet +listeners to process. + +This should solve some deadlock risks + +Also adds Netty Channel Flush Consolidation to reduce the amount of flushing + +Also avoids spamming closed channel exception by rechecking closed state in dispatch +and then catch exceptions and close if they fire. + +Part of this commit was authored by: Spottedleaf + +diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java +index dd9c03611e410e601ba4a7769474fada8c28c104..a283cce82069bf5dfd64229d102a19dfda158daf 100644 +--- a/src/main/java/net/minecraft/network/Connection.java ++++ b/src/main/java/net/minecraft/network/Connection.java +@@ -115,6 +115,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public int protocolVersion; + public java.net.InetSocketAddress virtualHost; + private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); ++ // Optimize network ++ public boolean isPending = true; ++ public boolean queueImmunity = false; ++ public ConnectionProtocol protocol; + // Paper end + + public Connection(PacketFlow side) { +@@ -138,6 +142,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + + public void setProtocol(ConnectionProtocol state) { ++ protocol = state; // Paper + this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state); + this.channel.config().setAutoRead(true); + Connection.LOGGER.debug("Enabled auto read"); +@@ -216,19 +221,89 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + Validate.notNull(listener, "packetListener", new Object[0]); + this.packetListener = listener; + } ++ // Paper start ++ public @Nullable net.minecraft.server.level.ServerPlayer getPlayer() { ++ if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl serverGamePacketListener) { ++ return serverGamePacketListener.player; ++ } else { ++ return null; ++ } ++ } ++ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up. ++ private static java.util.List<Packet> buildExtraPackets(Packet packet) { ++ java.util.List<Packet> extra = packet.getExtraPackets(); ++ if (extra == null || extra.isEmpty()) { ++ return null; ++ } ++ java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size()); ++ buildExtraPackets0(extra, ret); ++ return ret; ++ } ++ ++ private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) { ++ for (Packet extra : extraPackets) { ++ into.add(extra); ++ java.util.List<Packet> extraExtra = extra.getExtraPackets(); ++ if (extraExtra != null && !extraExtra.isEmpty()) { ++ buildExtraPackets0(extraExtra, into); ++ } ++ } ++ } ++ // Paper start ++ private static boolean canSendImmediate(Connection networkManager, Packet<?> packet) { ++ return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundKeepAlivePacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundChatPreviewPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket || ++ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket; ++ } ++ // Paper end ++ } ++ // Paper end + + public void send(Packet<?> packet) { + this.send(packet, (PacketSendListener) null); + } + + public void send(Packet<?> packet, @Nullable PacketSendListener callbacks) { +- if (this.isConnected()) { +- this.flushQueue(); ++ // Paper start - handle oversized packets better ++ boolean connected = this.isConnected(); ++ if (!connected && !preparing) { ++ return; // Do nothing ++ } ++ packet.onPacketDispatch(getPlayer()); ++ if (connected && (InnerUtil.canSendImmediate(this, packet) || ( ++ net.minecraft.server.MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() && ++ (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty()) ++ ))) { + this.sendPacket(packet, callbacks); +- } else { +- this.queue.add(new Connection.PacketHolder(packet, callbacks)); ++ return; + } ++ // write the packets to the queue, then flush - antixray hooks there already ++ java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet); ++ boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); ++ if (!hasExtraPackets) { ++ this.queue.add(new Connection.PacketHolder(packet, callbacks)); ++ } else { ++ java.util.List<Connection.PacketHolder> packets = new java.util.ArrayList<>(1 + extraPackets.size()); ++ packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets + ++ for (int i = 0, len = extraPackets.size(); i < len;) { ++ Packet extra = extraPackets.get(i); ++ boolean end = ++i == len; ++ packets.add(new Connection.PacketHolder(extra, end ? callbacks : null)); // append listener to the end ++ } ++ this.queue.addAll(packets); // atomic ++ } ++ this.flushQueue(); ++ // Paper end + } + + private void sendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks) { +@@ -256,6 +331,15 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + this.setProtocol(packetState); + } + ++ // Paper start ++ net.minecraft.server.level.ServerPlayer player = getPlayer(); ++ if (!isConnected()) { ++ packet.onPacketDispatchFinish(player, null); ++ return; ++ } ++ ++ try { ++ // Paper end + ChannelFuture channelfuture = this.channel.writeAndFlush(packet); + + if (callbacks != null) { +@@ -274,28 +358,64 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + + }); + } ++ // Paper start ++ if (packet.hasFinishListener()) { ++ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); ++ } ++ // Paper end + + channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); ++ // Paper start ++ } catch (Exception e) { ++ LOGGER.error("NetworkException: " + player, e); ++ disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage())); ++ packet.onPacketDispatchFinish(player, null); ++ } ++ // Paper end + } + + private ConnectionProtocol getCurrentProtocol() { + return (ConnectionProtocol) this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).get(); + } + +- private void flushQueue() { +- try { // Paper - add pending task queue +- if (this.channel != null && this.channel.isOpen()) { +- Queue queue = this.queue; +- ++ // Paper start - rewrite this to be safer if ran off main thread ++ private boolean flushQueue() { // void -> boolean ++ if (!isConnected()) { ++ return true; ++ } ++ if (net.minecraft.server.MCUtil.isMainThread()) { ++ return processQueue(); ++ } else if (isPending) { ++ // Should only happen during login/status stages + synchronized (this.queue) { +- Connection.PacketHolder networkmanager_queuedpacket; +- +- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) { +- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener); +- } ++ return this.processQueue(); ++ } ++ } ++ return false; ++ } ++ private boolean processQueue() { ++ try { // Paper - add pending task queue ++ if (this.queue.isEmpty()) return true; ++ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore ++ // But if we are not on main due to login/status, the parent is synchronized on packetQueue ++ java.util.Iterator<PacketHolder> iterator = this.queue.iterator(); ++ while (iterator.hasNext()) { ++ PacketHolder queued = iterator.next(); // poll -> peek ++ ++ // Fix NPE (Spigot bug caused by handleDisconnection()) ++ if (queued == null) { ++ return true; ++ } + ++ Packet<?> packet = queued.packet; ++ if (!packet.isReady()) { ++ return false; ++ } else { ++ iterator.remove(); ++ this.sendPacket(packet, queued.listener); + } + } ++ return true; + } finally { // Paper start - add pending task queue + Runnable r; + while ((r = this.pendingTasks.poll()) != null) { +@@ -303,6 +423,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } + } // Paper end - add pending task queue + } ++ // Paper end + + public void tick() { + this.flushQueue(); +@@ -339,9 +460,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + return this.address; + } + ++ // Paper start ++ public void clearPacketQueue() { ++ net.minecraft.server.level.ServerPlayer player = getPlayer(); ++ queue.forEach(queuedPacket -> { ++ Packet<?> packet = queuedPacket.packet; ++ if (packet.hasFinishListener()) { ++ packet.onPacketDispatchFinish(player, null); ++ } ++ }); ++ queue.clear(); ++ } ++ // Paper end + public void disconnect(Component disconnectReason) { + // Spigot Start + this.preparing = false; ++ clearPacketQueue(); // Paper + // Spigot End + if (this.channel.isOpen()) { + this.channel.close(); // We can't wait as this may be called from an event loop. +@@ -459,7 +593,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + public void handleDisconnection() { + if (this.channel != null && !this.channel.isOpen()) { + if (this.disconnectionHandled) { +- Connection.LOGGER.warn("handleDisconnection() called twice"); ++ //Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message + } else { + this.disconnectionHandled = true; + if (this.getDisconnectedReason() != null) { +@@ -467,7 +601,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> { + } else if (this.getPacketListener() != null) { + this.getPacketListener().onDisconnect(Component.translatable("multiplayer.disconnect.generic")); + } +- this.queue.clear(); // Free up packet queue. ++ clearPacketQueue(); // Paper + // Paper start - Add PlayerConnectionCloseEvent + final PacketListener packetListener = this.getPacketListener(); + if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) { +diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java +index 74bfe0d3942259c45702b099efdc4e101a4e3022..e8fcd56906d26f6dc87959e32c4c7c78cfea9658 100644 +--- a/src/main/java/net/minecraft/network/protocol/Packet.java ++++ b/src/main/java/net/minecraft/network/protocol/Packet.java +@@ -9,6 +9,19 @@ public interface Packet<T extends PacketListener> { + void handle(T listener); + + // Paper start ++ /** ++ * @param player Null if not at PLAY stage yet ++ */ ++ default void onPacketDispatch(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player) {} ++ ++ /** ++ * @param player Null if not at PLAY stage yet ++ * @param future Can be null if packet was cancelled ++ */ ++ default void onPacketDispatchFinish(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {} ++ default boolean hasFinishListener() { return false; } ++ default boolean isReady() { return true; } ++ default java.util.List<Packet> getExtraPackets() { return null; } + default boolean packetTooLarge(net.minecraft.network.Connection manager) { + return false; + } +diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java +index 0e04532b8f1d48116eb46dcef7952bfcc0d11394..a24ef433d0c9d06b86fd612978cfd6d877036791 100644 +--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java ++++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java +@@ -65,10 +65,12 @@ public class ServerConnectionListener { + final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList()); + // Paper start - prevent blocking on adding a new network manager while the server is ticking + private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); ++ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper + private final void addPending() { + Connection manager = null; + while ((manager = pending.poll()) != null) { + connections.add(manager); ++ manager.isPending = false; + } + } + // Paper end +@@ -103,6 +105,7 @@ public class ServerConnectionListener { + ; + } + ++ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper + channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this)).addLast("splitter", new Varint21FrameDecoder()).addLast("decoder", new PacketDecoder(PacketFlow.SERVERBOUND)).addLast("prepender", new Varint21LengthFieldPrepender()).addLast("encoder", new PacketEncoder(PacketFlow.CLIENTBOUND)); + int j = ServerConnectionListener.this.server.getRateLimitPacketsPerSecond(); + Object object = j > 0 ? new RateKickingConnection(j) : new Connection(PacketFlow.SERVERBOUND); |