aboutsummaryrefslogtreecommitdiffhomepage
path: root/patches/server/0309-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
blob: 8ae2ad20e3ba7b749329b8be68f268385d8dbc76 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Aikar <aikar@aikar.co>
Date: Wed, 6 May 2020 04:53:35 -0400
Subject: [PATCH] Optimize Network Manager and add advanced packet support

Adds ability for 1 packet to bundle other packets to follow it
Adds ability for a packet to delay sending more packets until a state is ready.

Removes synchronization from sending packets
Removes processing packet queue off of main thread
  - for the few cases where it is allowed, order is not necessary nor
    should it even be happening concurrently in first place (handshaking/login/status)

Ensures packets sent asynchronously are dispatched on main thread

This helps ensure safety for ProtocolLib as packet listeners
are commonly accessing world state. This will allow you to schedule
a packet to be sent async, but itll be dispatched sync for packet
listeners to process.

This should solve some deadlock risks

Also adds Netty Channel Flush Consolidation to reduce the amount of flushing

Also avoids spamming closed channel exception by rechecking closed state in dispatch
and then catch exceptions and close if they fire.

Part of this commit was authored by: Spottedleaf

diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index 821f22b8fde2d76bfcb417138f9bd83af766dcd7..f13e24eede7f09ecc8f375df5e27e385f589005d 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -87,6 +87,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
     public int protocolVersion;
     public java.net.InetSocketAddress virtualHost;
     private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
+    // Optimize network
+    public boolean isPending = true;
+    public boolean queueImmunity = false;
+    public ConnectionProtocol protocol;
     // Paper end
 
     public Connection(PacketFlow side) {
@@ -110,6 +114,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
     }
 
     public void setProtocol(ConnectionProtocol state) {
+        protocol = state; // Paper
         this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state);
         this.channel.config().setAutoRead(true);
         Connection.LOGGER.debug("Enabled auto read");
@@ -186,19 +191,87 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
         Validate.notNull(listener, "packetListener", new Object[0]);
         this.packetListener = listener;
     }
+    // Paper start
+    public net.minecraft.server.level.ServerPlayer getPlayer() {
+        if (packetListener instanceof ServerGamePacketListenerImpl) {
+            return ((ServerGamePacketListenerImpl) packetListener).player;
+        } else {
+            return null;
+        }
+    }
+    private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
+        private static java.util.List<Packet> buildExtraPackets(Packet packet) {
+            java.util.List<Packet> extra = packet.getExtraPackets();
+            if (extra == null || extra.isEmpty()) {
+                return null;
+            }
+            java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
+            buildExtraPackets0(extra, ret);
+            return ret;
+        }
+
+        private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
+            for (Packet extra : extraPackets) {
+                into.add(extra);
+                java.util.List<Packet> extraExtra = extra.getExtraPackets();
+                if (extraExtra != null && !extraExtra.isEmpty()) {
+                    buildExtraPackets0(extraExtra, into);
+                }
+            }
+        }
+        // Paper start
+        private static boolean canSendImmediate(Connection networkManager, Packet<?> packet) {
+            return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundKeepAlivePacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundChatPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
+                packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
+        }
+        // Paper end
+    }
+    // Paper end
 
     public void send(Packet<?> packet) {
         this.send(packet, (GenericFutureListener) null);
     }
 
     public void send(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback) {
-        if (this.isConnected()) {
-            this.flushQueue();
+        // Paper start - handle oversized packets better
+        boolean connected = this.isConnected();
+        if (!connected && !preparing) {
+            return; // Do nothing
+        }
+        packet.onPacketDispatch(getPlayer());
+        if (connected && (InnerUtil.canSendImmediate(this, packet) || (
+            net.minecraft.server.MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() &&
+            (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
+        ))) {
             this.sendPacket(packet, callback);
-        } else {
-            this.queue.add(new Connection.PacketHolder(packet, callback));
+            return;
         }
+        // write the packets to the queue, then flush - antixray hooks there already
+        java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
+        boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
+        if (!hasExtraPackets) {
+            this.queue.add(new Connection.PacketHolder(packet, callback));
+        } else {
+            java.util.List<Connection.PacketHolder> packets = new java.util.ArrayList<>(1 + extraPackets.size());
+            packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets
 
+            for (int i = 0, len = extraPackets.size(); i < len;) {
+                Packet extra = extraPackets.get(i);
+                boolean end = ++i == len;
+                packets.add(new Connection.PacketHolder(extra, end ? callback : null)); // append listener to the end
+            }
+            this.queue.addAll(packets); // atomic
+        }
+        this.flushQueue();
+        // Paper end
     }
 
     private void sendPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback) {
@@ -226,33 +299,79 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
             this.setProtocol(packetState);
         }
 
+        // Paper start
+        net.minecraft.server.level.ServerPlayer player = getPlayer();
+        if (!isConnected()) {
+            packet.onPacketDispatchFinish(player, null);
+            return;
+        }
+
+        try {
+            // Paper end
         ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
 
         if (callback != null) {
             channelfuture.addListener(callback);
         }
+        // Paper start
+        if (packet.hasFinishListener()) {
+            channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
+        }
+        // Paper end
 
         channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        // Paper start
+        } catch (Exception e) {
+            LOGGER.error("NetworkException: " + player, e);
+            disconnect(new net.minecraft.network.chat.TranslatableComponent("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
+            packet.onPacketDispatchFinish(player, null);
+        }
+        // Paper end
     }
 
     private ConnectionProtocol getCurrentProtocol() {
         return (ConnectionProtocol) this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).get();
     }
 
-    private void flushQueue() {
-        if (this.channel != null && this.channel.isOpen()) {
-            Queue queue = this.queue;
-
+    // Paper start - rewrite this to be safer if ran off main thread
+    private boolean flushQueue() { // void -> boolean
+        if (!isConnected()) {
+            return true;
+        }
+        if (net.minecraft.server.MCUtil.isMainThread()) {
+            return processQueue();
+        } else if (isPending) {
+            // Should only happen during login/status stages
             synchronized (this.queue) {
-                Connection.PacketHolder networkmanager_queuedpacket;
-
-                while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
-                    this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
-                }
+                return this.processQueue();
+            }
+        }
+        return false;
+    }
+    private boolean processQueue() {
+        if (this.queue.isEmpty()) return true;
+        // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
+        // But if we are not on main due to login/status, the parent is synchronized on packetQueue
+        java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
+        while (iterator.hasNext()) {
+            PacketHolder queued = iterator.next(); // poll -> peek
+
+            // Fix NPE (Spigot bug caused by handleDisconnection())
+            if (queued == null) {
+                return true;
+            }
 
+            Packet<?> packet = queued.packet;
+            if (!packet.isReady()) {
+                return false;
+            } else {
+                iterator.remove();
+                this.sendPacket(packet, queued.listener);
             }
         }
+        return true;
     }
+    // Paper end
 
     public void tick() {
         this.flushQueue();
@@ -289,9 +408,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
         return this.address;
     }
 
+    // Paper start
+    public void clearPacketQueue() {
+        net.minecraft.server.level.ServerPlayer player = getPlayer();
+        queue.forEach(queuedPacket -> {
+            Packet<?> packet = queuedPacket.packet;
+            if (packet.hasFinishListener()) {
+                packet.onPacketDispatchFinish(player, null);
+            }
+        });
+        queue.clear();
+    }
+    // Paper end
     public void disconnect(Component disconnectReason) {
         // Spigot Start
         this.preparing = false;
+        clearPacketQueue(); // Paper
         // Spigot End
         if (this.channel.isOpen()) {
             this.channel.close(); // We can't wait as this may be called from an event loop.
@@ -409,7 +541,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
     public void handleDisconnection() {
         if (this.channel != null && !this.channel.isOpen()) {
             if (this.disconnectionHandled) {
-                Connection.LOGGER.warn("handleDisconnection() called twice");
+                //Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message
             } else {
                 this.disconnectionHandled = true;
                 if (this.getDisconnectedReason() != null) {
@@ -417,7 +549,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
                 } else if (this.getPacketListener() != null) {
                     this.getPacketListener().onDisconnect(new TranslatableComponent("multiplayer.disconnect.generic"));
                 }
-                this.queue.clear(); // Free up packet queue.
+                clearPacketQueue(); // Paper
                 // Paper start - Add PlayerConnectionCloseEvent
                 final PacketListener packetListener = this.getPacketListener();
                 if (packetListener instanceof ServerGamePacketListenerImpl) {
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
index 74bfe0d3942259c45702b099efdc4e101a4e3022..e8fcd56906d26f6dc87959e32c4c7c78cfea9658 100644
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
@@ -9,6 +9,19 @@ public interface Packet<T extends PacketListener> {
     void handle(T listener);
 
     // Paper start
+    /**
+     * @param player Null if not at PLAY stage yet
+     */
+    default void onPacketDispatch(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player) {}
+
+    /**
+     * @param player Null if not at PLAY stage yet
+     * @param future Can be null if packet was cancelled
+     */
+    default void onPacketDispatchFinish(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {}
+    default boolean hasFinishListener() { return false; }
+    default boolean isReady() { return true; }
+    default java.util.List<Packet> getExtraPackets() { return null; }
     default boolean packetTooLarge(net.minecraft.network.Connection manager) {
         return false;
     }
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
index 526e07d8ea21af42c271bee4da5bccd766227006..6bf39699700075e295a693b56d237391de4e4f58 100644
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
@@ -63,10 +63,12 @@ public class ServerConnectionListener {
     final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
     // Paper start - prevent blocking on adding a new network manager while the server is ticking
     private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
+    private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper
     private final void addPending() {
         Connection manager = null;
         while ((manager = pending.poll()) != null) {
             connections.add(manager);
+            manager.isPending = false;
         }
     }
     // Paper end
@@ -101,6 +103,7 @@ public class ServerConnectionListener {
                         ;
                     }
 
+                    if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper
                     channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this)).addLast("splitter", new Varint21FrameDecoder()).addLast("decoder", new PacketDecoder(PacketFlow.SERVERBOUND)).addLast("prepender", new Varint21LengthFieldPrepender()).addLast("encoder", new PacketEncoder(PacketFlow.CLIENTBOUND));
                     int j = ServerConnectionListener.this.server.getRateLimitPacketsPerSecond();
                     Object object = j > 0 ? new RateKickingConnection(j) : new Connection(PacketFlow.SERVERBOUND);