aboutsummaryrefslogtreecommitdiffhomepage
path: root/patches/server/0990-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
diff options
context:
space:
mode:
Diffstat (limited to 'patches/server/0990-Optimize-Network-Manager-and-add-advanced-packet-sup.patch')
-rw-r--r--patches/server/0990-Optimize-Network-Manager-and-add-advanced-packet-sup.patch394
1 files changed, 394 insertions, 0 deletions
diff --git a/patches/server/0990-Optimize-Network-Manager-and-add-advanced-packet-sup.patch b/patches/server/0990-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
new file mode 100644
index 0000000000..6ab7cbf215
--- /dev/null
+++ b/patches/server/0990-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
@@ -0,0 +1,394 @@
+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Aikar <[email protected]>
+Date: Wed, 6 May 2020 04:53:35 -0400
+Subject: [PATCH] Optimize Network Manager and add advanced packet support
+
+Adds ability for 1 packet to bundle other packets to follow it
+Adds ability for a packet to delay sending more packets until a state is ready.
+
+Removes synchronization from sending packets
+Removes processing packet queue off of main thread
+ - for the few cases where it is allowed, order is not necessary nor
+ should it even be happening concurrently in first place (handshaking/login/status)
+
+Ensures packets sent asynchronously are dispatched on main thread
+
+This helps ensure safety for ProtocolLib as packet listeners
+are commonly accessing world state. This will allow you to schedule
+a packet to be sent async, but itll be dispatched sync for packet
+listeners to process.
+
+This should solve some deadlock risks
+
+Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
+
+Also avoids spamming closed channel exception by rechecking closed state in dispatch
+and then catch exceptions and close if they fire.
+
+Part of this commit was authored by: Spottedleaf, sandtechnology
+
+diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
+index 22a7f17180b76b6c3548d3b54ae8218a469401a8..7f2aa5e17fe675f3404d67b1794d2ca68b188eb9 100644
+--- a/src/main/java/net/minecraft/network/Connection.java
++++ b/src/main/java/net/minecraft/network/Connection.java
+@@ -84,7 +84,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).setUncaughtExceptionHandler(new net.minecraft.DefaultUncaughtExceptionHandlerWithName(LOGGER)).build()); // Paper
+ });
+ private final PacketFlow receiving;
+- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
++ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue();
+ public Channel channel;
+ public SocketAddress address;
+ // Spigot Start
+@@ -116,6 +116,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ public java.net.InetSocketAddress virtualHost;
+ private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); // Paper - Disable explicit network manager flushing
+ // Paper end
++ // Paper start - Optimize network
++ public boolean isPending = true;
++ public boolean queueImmunity;
++ // Paper end - Optimize network
+
+ // Paper start - add utility methods
+ public final net.minecraft.server.level.ServerPlayer getPlayer() {
+@@ -375,15 +379,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ }
+
+ public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
+- if (this.isConnected()) {
+- this.flushQueue();
++ // Paper start - Optimize network: Handle oversized packets better
++ final boolean connected = this.isConnected();
++ if (!connected && !this.preparing) {
++ return;
++ }
++
++ packet.onPacketDispatch(this.getPlayer());
++ if (connected && (InnerUtil.canSendImmediate(this, packet)
++ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
++ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
+ this.sendPacket(packet, callbacks, flush);
+ } else {
+- this.pendingActions.add((networkmanager) -> {
+- networkmanager.sendPacket(packet, callbacks, flush);
+- });
+- }
++ // Write the packets to the queue, then flush - antixray hooks there already
++ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
++ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
++ if (!hasExtraPackets) {
++ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush));
++ } else {
++ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
++ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
+
++ for (int i = 0, len = extraPackets.size(); i < len;) {
++ final Packet<?> extraPacket = extraPackets.get(i);
++ final boolean end = ++i == len;
++ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end
++ }
++
++ this.pendingActions.addAll(actions);
++ }
++
++ this.flushQueue();
++ // Paper end - Optimize network
++ }
+ }
+
+ public void runOnceConnected(Consumer<Connection> task) {
+@@ -391,7 +419,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ this.flushQueue();
+ task.accept(this);
+ } else {
+- this.pendingActions.add(task);
++ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network
+ }
+
+ }
+@@ -409,6 +437,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ }
+
+ private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
++ // Paper start - Optimize network
++ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
++ if (!this.isConnected()) {
++ packet.onPacketDispatchFinish(player, null);
++ return;
++ }
++ try {
++ // Paper end - Optimize network
+ ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
+
+ if (callbacks != null) {
+@@ -428,14 +464,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ });
+ }
+
++ // Paper start - Optimize network
++ if (packet.hasFinishListener()) {
++ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
++ }
+ channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
++ } catch (final Exception e) {
++ LOGGER.error("NetworkException: {}", player, e);
++ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
++ packet.onPacketDispatchFinish(player, null);
++ }
++ // Paper end - Optimize network
+ }
+
+ public void flushChannel() {
+ if (this.isConnected()) {
+ this.flush();
+ } else {
+- this.pendingActions.add(Connection::flush);
++ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
+ }
+
+ }
+@@ -468,20 +514,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ return attributekey;
+ }
+
+- private void flushQueue() {
+- if (this.channel != null && this.channel.isOpen()) {
+- Queue queue = this.pendingActions;
+-
++ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
++ private boolean flushQueue() {
++ if (!this.isConnected()) {
++ return true;
++ }
++ if (io.papermc.paper.util.MCUtil.isMainThread()) {
++ return this.processQueue();
++ } else if (this.isPending) {
++ // Should only happen during login/status stages
+ synchronized (this.pendingActions) {
+- Consumer consumer;
++ return this.processQueue();
++ }
++ }
++ return false;
++ }
++
++ private boolean processQueue() {
++ if (this.pendingActions.isEmpty()) {
++ return true;
++ }
+
+- while ((consumer = (Consumer) this.pendingActions.poll()) != null) {
+- consumer.accept(this);
++ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
++ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
++ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
++ while (iterator.hasNext()) {
++ final WrappedConsumer queued = iterator.next(); // poll -> peek
++
++ // Fix NPE (Spigot bug caused by handleDisconnection())
++ if (queued == null) {
++ return true;
++ }
++
++ if (queued.isConsumed()) {
++ continue;
++ }
++
++ if (queued instanceof PacketSendAction packetSendAction) {
++ final Packet<?> packet = packetSendAction.packet;
++ if (!packet.isReady()) {
++ return false;
+ }
++ }
+
++ iterator.remove();
++ if (queued.tryMarkConsumed()) {
++ queued.accept(this);
+ }
+ }
++ return true;
+ }
++ // Paper end - Optimize network
+
+ private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
+ private static int joinAttemptsThisTick; // Paper - Buffer joins to world
+@@ -544,6 +627,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ public void disconnect(Component disconnectReason) {
+ // Spigot Start
+ this.preparing = false;
++ this.clearPacketQueue(); // Paper - Optimize network
+ // Spigot End
+ if (this.channel == null) {
+ this.delayedDisconnect = disconnectReason;
+@@ -715,7 +799,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ public void handleDisconnection() {
+ if (this.channel != null && !this.channel.isOpen()) {
+ if (this.disconnectionHandled) {
+- Connection.LOGGER.warn("handleDisconnection() called twice");
++ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
+ } else {
+ this.disconnectionHandled = true;
+ PacketListener packetlistener = this.getPacketListener();
+@@ -728,7 +812,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+
+ packetlistener1.onDisconnect(ichatbasecomponent);
+ }
+- this.pendingActions.clear(); // Free up packet queue.
++ this.clearPacketQueue(); // Paper - Optimize network
+ // Paper start - Add PlayerConnectionCloseEvent
+ final PacketListener packetListener = this.getPacketListener();
+ if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
+@@ -765,4 +849,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ public void setBandwidthLogger(SampleLogger log) {
+ this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log);
+ }
++
++ // Paper start - Optimize network
++ public void clearPacketQueue() {
++ final net.minecraft.server.level.ServerPlayer player = getPlayer();
++ for (final Consumer<Connection> queuedAction : this.pendingActions) {
++ if (queuedAction instanceof PacketSendAction packetSendAction) {
++ final Packet<?> packet = packetSendAction.packet;
++ if (packet.hasFinishListener()) {
++ packet.onPacketDispatchFinish(player, null);
++ }
++ }
++ }
++ this.pendingActions.clear();
++ }
++
++ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
++
++ @Nullable
++ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
++ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
++ if (extra == null || extra.isEmpty()) {
++ return null;
++ }
++
++ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
++ buildExtraPackets0(extra, ret);
++ return ret;
++ }
++
++ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
++ for (final Packet<?> extra : extraPackets) {
++ into.add(extra);
++ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
++ if (extraExtra != null && !extraExtra.isEmpty()) {
++ buildExtraPackets0(extraExtra, into);
++ }
++ }
++ }
++
++ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
++ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
++ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
++ }
++ }
++
++ private static class WrappedConsumer implements Consumer<Connection> {
++ private final Consumer<Connection> delegate;
++ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
++
++ private WrappedConsumer(final Consumer<Connection> delegate) {
++ this.delegate = delegate;
++ }
++
++ @Override
++ public void accept(final Connection connection) {
++ this.delegate.accept(connection);
++ }
++
++ public boolean tryMarkConsumed() {
++ return consumed.compareAndSet(false, true);
++ }
++
++ public boolean isConsumed() {
++ return consumed.get();
++ }
++ }
++
++ private static final class PacketSendAction extends WrappedConsumer {
++ private final Packet<?> packet;
++
++ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
++ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
++ this.packet = packet;
++ }
++ }
++ // Paper end - Optimize network
+ }
+diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
+index cc658a61065d5c0021a4b88fa58b40211b94f8ec..da11266a0a23f446196e6facf2c358cfcc18070f 100644
+--- a/src/main/java/net/minecraft/network/protocol/Packet.java
++++ b/src/main/java/net/minecraft/network/protocol/Packet.java
+@@ -11,6 +11,30 @@ public interface Packet<T extends PacketListener> {
+ void handle(T listener);
+
+ // Paper start
++ /**
++ * @param player Null if not at PLAY stage yet
++ */
++ default void onPacketDispatch(@Nullable net.minecraft.server.level.ServerPlayer player) {
++ }
++
++ /**
++ * @param player Null if not at PLAY stage yet
++ * @param future Can be null if packet was cancelled
++ */
++ default void onPacketDispatchFinish(@Nullable net.minecraft.server.level.ServerPlayer player, @Nullable io.netty.channel.ChannelFuture future) {}
++
++ default boolean hasFinishListener() {
++ return false;
++ }
++
++ default boolean isReady() {
++ return true;
++ }
++
++ @Nullable
++ default java.util.List<Packet<?>> getExtraPackets() {
++ return null;
++ }
+ default boolean packetTooLarge(net.minecraft.network.Connection manager) {
+ return false;
+ }
+diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
+index 4f330a44c77a7ec3237a86fda04921a8c4a1c00f..a4a29a7ea0035ecf4c61ee8547a9eb24acb667d0 100644
+--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
++++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
+@@ -63,10 +63,12 @@ public class ServerConnectionListener {
+ final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
+ // Paper start - prevent blocking on adding a new connection while the server is ticking
+ private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
++ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
+ private final void addPending() {
+ Connection connection;
+ while ((connection = pending.poll()) != null) {
+ connections.add(connection);
++ connection.isPending = false; // Paper - Optimize network
+ }
+ }
+ // Paper end - prevent blocking on adding a new connection while the server is ticking
+@@ -114,6 +116,7 @@ public class ServerConnectionListener {
+ ;
+ }
+
++ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
+ ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
+
+ Connection.configureSerialization(channelpipeline, PacketFlow.SERVERBOUND, (BandwidthDebugMonitor) null);