aboutsummaryrefslogtreecommitdiffhomepage
path: root/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
diff options
context:
space:
mode:
authorNassim Jahnke <[email protected]>2024-12-21 13:45:04 +0100
committerNassim Jahnke <[email protected]>2024-12-21 13:45:04 +0100
commit3b0b3a0aef5863b37f252bfffef7ff5b9b266445 (patch)
treed6088aef454eff0d163847ebdcc5eb349fcac9bb /paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
parent82216a59fef17863c86192c2754417480baa71c1 (diff)
downloadPaper-3b0b3a0aef5863b37f252bfffef7ff5b9b266445.tar.gz
Paper-3b0b3a0aef5863b37f252bfffef7ff5b9b266445.zip
and some more
Diffstat (limited to 'paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch')
-rw-r--r--paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch391
1 files changed, 391 insertions, 0 deletions
diff --git a/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch b/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
new file mode 100644
index 0000000000..02589fdaae
--- /dev/null
+++ b/paper-server/patches/features/0001-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
@@ -0,0 +1,391 @@
+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Aikar <[email protected]>
+Date: Wed, 6 May 2020 04:53:35 -0400
+Subject: [PATCH] Optimize Network Manager and add advanced packet support
+
+Adds ability for 1 packet to bundle other packets to follow it
+Adds ability for a packet to delay sending more packets until a state is ready.
+
+Removes synchronization from sending packets
+Removes processing packet queue off of main thread
+ - for the few cases where it is allowed, order is not necessary nor
+ should it even be happening concurrently in first place (handshaking/login/status)
+
+Ensures packets sent asynchronously are dispatched on main thread
+
+This helps ensure safety for ProtocolLib as packet listeners
+are commonly accessing world state. This will allow you to schedule
+a packet to be sent async, but itll be dispatched sync for packet
+listeners to process.
+
+This should solve some deadlock risks
+
+Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
+
+Also avoids spamming closed channel exception by rechecking closed state in dispatch
+and then catch exceptions and close if they fire.
+
+Part of this commit was authored by: Spottedleaf, sandtechnology
+
+diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java
+index 42f44c7cb0bd55ddfacd18acb0e596e7a953870e..ad8f8428b75e37097487cdfbd0db2421ee4cbe37 100644
+--- a/net/minecraft/network/Connection.java
++++ b/net/minecraft/network/Connection.java
+@@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND;
+ private final PacketFlow receiving;
+ private volatile boolean sendLoginDisconnect = true;
+- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
++ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper - Optimize network
+ public Channel channel;
+ public SocketAddress address;
+ // Spigot start
+@@ -145,6 +145,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ }
+ // Paper end - packet limiter
+ @Nullable public SocketAddress haProxyAddress; // Paper - Add API to get player's proxy address
++ // Paper start - Optimize network
++ public boolean isPending = true;
++ public boolean queueImmunity;
++ // Paper end - Optimize network
+
+ public Connection(PacketFlow receiving) {
+ this.receiving = receiving;
+@@ -423,11 +427,38 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ }
+
+ public void send(Packet<?> packet, @Nullable PacketSendListener listener, boolean flush) {
+- if (this.isConnected()) {
+- this.flushQueue();
++ // Paper start - Optimize network: Handle oversized packets better
++ final boolean connected = this.isConnected();
++ if (!connected && !this.preparing) {
++ return;
++ }
++
++ packet.onPacketDispatch(this.getPlayer());
++ if (connected && (InnerUtil.canSendImmediate(this, packet)
++ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
++ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
+ this.sendPacket(packet, listener, flush);
+ } else {
+- this.pendingActions.add(connection -> connection.sendPacket(packet, listener, flush));
++ // Write the packets to the queue, then flush - antixray hooks there already
++ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
++ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
++ if (!hasExtraPackets) {
++ this.pendingActions.add(new PacketSendAction(packet, listener, flush));
++ } else {
++ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
++ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
++
++ for (int i = 0, len = extraPackets.size(); i < len;) {
++ final Packet<?> extraPacket = extraPackets.get(i);
++ final boolean end = ++i == len;
++ actions.add(new PacketSendAction(extraPacket, end ? listener : null, end)); // Append listener to the end
++ }
++
++ this.pendingActions.addAll(actions);
++ }
++
++ this.flushQueue();
++ // Paper end - Optimize network
+ }
+ }
+
+@@ -436,7 +467,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ this.flushQueue();
+ action.accept(this);
+ } else {
+- this.pendingActions.add(action);
++ this.pendingActions.add(new WrappedConsumer(action)); // Paper - Optimize network
+ }
+ }
+
+@@ -450,6 +481,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ }
+
+ private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
++ // Paper start - Optimize network
++ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
++ if (!this.isConnected()) {
++ packet.onPacketDispatchFinish(player, null);
++ return;
++ }
++ try {
++ // Paper end - Optimize network
+ ChannelFuture channelFuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
+ if (sendListener != null) {
+ channelFuture.addListener(future -> {
+@@ -465,14 +504,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ });
+ }
+
++ // Paper start - Optimize network
++ if (packet.hasFinishListener()) {
++ channelFuture.addListener((ChannelFutureListener) future -> packet.onPacketDispatchFinish(player, future));
++ }
+ channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
++ } catch (final Exception e) {
++ LOGGER.error("NetworkException: {}", player, e);
++ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
++ packet.onPacketDispatchFinish(player, null);
++ }
++ // Paper end - Optimize network
+ }
+
+ public void flushChannel() {
+ if (this.isConnected()) {
+ this.flush();
+ } else {
+- this.pendingActions.add(Connection::flush);
++ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
+ }
+ }
+
+@@ -484,16 +533,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ }
+ }
+
+- private void flushQueue() {
+- if (this.channel != null && this.channel.isOpen()) {
++ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
++ private boolean flushQueue() {
++ if (!this.isConnected()) {
++ return true;
++ }
++ if (io.papermc.paper.util.MCUtil.isMainThread()) {
++ return this.processQueue();
++ } else if (this.isPending) {
++ // Should only happen during login/status stages
+ synchronized (this.pendingActions) {
+- Consumer<Connection> consumer;
+- while ((consumer = this.pendingActions.poll()) != null) {
+- consumer.accept(this);
++ return this.processQueue();
++ }
++ }
++ return false;
++ }
++
++ private boolean processQueue() {
++ if (this.pendingActions.isEmpty()) {
++ return true;
++ }
++
++ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
++ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
++ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
++ while (iterator.hasNext()) {
++ final WrappedConsumer queued = iterator.next(); // poll -> peek
++
++ // Fix NPE (Spigot bug caused by handleDisconnection())
++ if (queued == null) {
++ return true;
++ }
++
++ if (queued.isConsumed()) {
++ continue;
++ }
++
++ if (queued instanceof PacketSendAction packetSendAction) {
++ final Packet<?> packet = packetSendAction.packet;
++ if (!packet.isReady()) {
++ return false;
+ }
+ }
++
++ iterator.remove();
++ if (queued.tryMarkConsumed()) {
++ queued.accept(this);
++ }
+ }
++ return true;
+ }
++ // Paper end - Optimize network
+
+ private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
+ private static int joinAttemptsThisTick; // Paper - Buffer joins to world
+@@ -563,6 +653,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+
+ public void disconnect(DisconnectionDetails disconnectionDetails) {
+ this.preparing = false; // Spigot
++ this.clearPacketQueue(); // Paper - Optimize network
+ if (this.channel == null) {
+ this.delayedDisconnect = disconnectionDetails;
+ }
+@@ -751,7 +842,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ public void handleDisconnection() {
+ if (this.channel != null && !this.channel.isOpen()) {
+ if (this.disconnectionHandled) {
+- LOGGER.warn("handleDisconnection() called twice");
++ // LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
+ } else {
+ this.disconnectionHandled = true;
+ PacketListener packetListener = this.getPacketListener();
+@@ -762,7 +853,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ );
+ packetListener1.onDisconnect(disconnectionDetails);
+ }
+- this.pendingActions.clear(); // Free up packet queue.
++ this.clearPacketQueue(); // Paper - Optimize network
+ // Paper start - Add PlayerConnectionCloseEvent
+ if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
+ /* Player was logged in, either game listener or configuration listener */
+@@ -797,4 +888,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
+ public void setBandwidthLogger(LocalSampleLogger bandwithLogger) {
+ this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger);
+ }
++
++ // Paper start - Optimize network
++ public void clearPacketQueue() {
++ final net.minecraft.server.level.ServerPlayer player = getPlayer();
++ for (final Consumer<Connection> queuedAction : this.pendingActions) {
++ if (queuedAction instanceof PacketSendAction packetSendAction) {
++ final Packet<?> packet = packetSendAction.packet;
++ if (packet.hasFinishListener()) {
++ packet.onPacketDispatchFinish(player, null);
++ }
++ }
++ }
++ this.pendingActions.clear();
++ }
++
++ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
++
++ @Nullable
++ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
++ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
++ if (extra == null || extra.isEmpty()) {
++ return null;
++ }
++
++ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
++ buildExtraPackets0(extra, ret);
++ return ret;
++ }
++
++ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
++ for (final Packet<?> extra : extraPackets) {
++ into.add(extra);
++ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
++ if (extraExtra != null && !extraExtra.isEmpty()) {
++ buildExtraPackets0(extraExtra, into);
++ }
++ }
++ }
++
++ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
++ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
++ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket ||
++ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
++ }
++ }
++
++ private static class WrappedConsumer implements Consumer<Connection> {
++ private final Consumer<Connection> delegate;
++ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
++
++ private WrappedConsumer(final Consumer<Connection> delegate) {
++ this.delegate = delegate;
++ }
++
++ @Override
++ public void accept(final Connection connection) {
++ this.delegate.accept(connection);
++ }
++
++ public boolean tryMarkConsumed() {
++ return consumed.compareAndSet(false, true);
++ }
++
++ public boolean isConsumed() {
++ return consumed.get();
++ }
++ }
++
++ private static final class PacketSendAction extends WrappedConsumer {
++ private final Packet<?> packet;
++
++ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
++ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
++ this.packet = packet;
++ }
++ }
++ // Paper end - Optimize network
+ }
+diff --git a/net/minecraft/network/protocol/Packet.java b/net/minecraft/network/protocol/Packet.java
+index 65ff8b9112ec76eeac48c679044fc02ae7d4ffeb..e4789584cbe43959681a8522c66eab58369bebd0 100644
+--- a/net/minecraft/network/protocol/Packet.java
++++ b/net/minecraft/network/protocol/Packet.java
+@@ -35,4 +35,32 @@ public interface Packet<T extends PacketListener> {
+ static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) {
+ return StreamCodec.ofMember(encoder, decoder);
+ }
++
++ // Paper start
++ /**
++ * @param player Null if not at PLAY stage yet
++ */
++ default void onPacketDispatch(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player) {
++ }
++
++ /**
++ * @param player Null if not at PLAY stage yet
++ * @param future Can be null if packet was cancelled
++ */
++ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {
++ }
++
++ default boolean hasFinishListener() {
++ return false;
++ }
++
++ default boolean isReady() {
++ return true;
++ }
++
++ @org.jetbrains.annotations.Nullable
++ default java.util.List<Packet<?>> getExtraPackets() {
++ return null;
++ }
++ // Paper end
+ }
+diff --git a/net/minecraft/server/network/ServerConnectionListener.java b/net/minecraft/server/network/ServerConnectionListener.java
+index b873af7183d9b1aabc87e63d254a4326f17b21c9..7de11ba404f0b60e7f7b7c16954811a343688219 100644
+--- a/net/minecraft/server/network/ServerConnectionListener.java
++++ b/net/minecraft/server/network/ServerConnectionListener.java
+@@ -66,11 +66,13 @@ public class ServerConnectionListener {
+
+ // Paper start - prevent blocking on adding a new connection while the server is ticking
+ private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
++ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
+
+ private final void addPending() {
+ Connection connection;
+ while ((connection = this.pending.poll()) != null) {
+ this.connections.add(connection);
++ connection.isPending = false; // Paper - Optimize network
+ }
+ }
+ // Paper end - prevent blocking on adding a new connection while the server is ticking
+@@ -120,6 +122,7 @@ public class ServerConnectionListener {
+ } catch (ChannelException var5) {
+ }
+
++ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
+ ChannelPipeline channelPipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30));
+ if (ServerConnectionListener.this.server.repliesToStatus()) {
+ channelPipeline.addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));