aboutsummaryrefslogtreecommitdiffhomepage
path: root/patches/server/0019-Asynchronous-chunk-IO-and-loading.patch
diff options
context:
space:
mode:
Diffstat (limited to 'patches/server/0019-Asynchronous-chunk-IO-and-loading.patch')
-rw-r--r--patches/server/0019-Asynchronous-chunk-IO-and-loading.patch3593
1 files changed, 3593 insertions, 0 deletions
diff --git a/patches/server/0019-Asynchronous-chunk-IO-and-loading.patch b/patches/server/0019-Asynchronous-chunk-IO-and-loading.patch
new file mode 100644
index 0000000000..cf7bd1b0ad
--- /dev/null
+++ b/patches/server/0019-Asynchronous-chunk-IO-and-loading.patch
@@ -0,0 +1,3593 @@
+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Spottedleaf <[email protected]>
+Date: Sat, 13 Jul 2019 09:23:10 -0700
+Subject: [PATCH] Asynchronous chunk IO and loading
+
+ChunkSerializer needs the new tick lists to be saved (see added todos)
+
+This patch re-adds a file IO thread as well as shoving de-serializing
+chunk NBT data onto worker threads. This patch also will shove
+chunk data serialization onto the same worker threads when the chunk
+is unloaded - this cannot be done for regular saves since that's unsafe.
+
+The file IO Thread
+
+Unlike 1.13 and below, the file IO thread is prioritized - IO tasks can
+be reoredered, however they are "stuck" to a world & coordinate.
+
+Scheduling IO tasks works as follows, given a world & coordinate - location:
+
+The IO thread has been designed to ensure that reads and writes appear to
+occur synchronously for a given location, however the implementation also
+has the unfortunate side-effect of making every write appear as if
+they occur without failure.
+
+The IO thread has also been designed to accomodate Mojang's decision to
+store chunk data and POI data separately. It can independently schedule
+tasks for each.
+
+However threads can wait for writes to complete and check if:
+ - The write was overwriten by another scheduler
+ - The write failed (however it does not indicate whether it was overwritten by another scheduler)
+
+Scheduling reads:
+
+ - If a write task is in progress, the task is not scheduled and returns the in-progress write data
+ This means that readers cannot modify the NBTTagCompound returned and must clone if it they wish to write
+ - If a write task is not in progress but a read task is in progress, then the read task is simply chained
+ This means that again, readers cannot modify the NBTTagCompound returned
+
+Scheduling writes:
+
+ - If a read task is in progress, ignore the read task and schedule the write
+ We cannot complete the read task since we assume it wants old data - not current
+ - If a write task is pending, overwrite the write data
+ The file IO thread does correctly handle cases where the data is overwritten when it
+ is writing data (before completing a task it will check if the data was overwritten and
+ will retry).
+
+When the file IO thread executes a task for a location, the it will
+execute the read task first (if it exists), then it will execute the
+write task. This ensures that, even when scheduling at different
+priorities, that reads/writes for a location act synchronously.
+
+The downside of the file IO thread is that write failure can only be
+indicated to the scheduling thread if:
+
+- No other thread decides to schedule another write for the location
+concurrently
+- The scheduling thread blocks on the write to complete (however the
+current implementation can be modified to indicate success
+asynchronously)
+
+The file io thread can be modified easily to provide indications
+of write failure and write overwriting if needed.
+
+The upside of the file IO thread is that if a write failures, then
+chunk data is not lost until server restart. This leaves more room
+for spurious failure.
+
+Finally, the io thread will indicate to the console when reads
+or writes fail - with relevant detail.
+
+Asynchronous chunk data serialization for unloading chunks
+
+When chunks unload they make a call to PlayerChunkMap#saveChunk(IChunkAccess).
+Even if I make the IO asynchronous for this call, the data serialization
+still hits pretty hard. And given that now the chunk system will
+aggressively unload chunks more often (queued immediately at
+ticket level 45 or higher), unloads occur more often, and
+combined with our changes to the unload queue to make it
+significantly more aggresive - chunk unloads can hit pretty hard.
+Especially players running around with elytras and fireworks.
+
+For serializing chunk data off main, there are some tasks which cannot be
+done asynchronously. Lighting data must be saved beforehand as well as
+potentially some tick lists. These are completed before scheduling the
+asynchronous save.
+
+However serializing chunk data off of the main thread is still risky.
+Even though this patch schedules the save to occur after ALL references
+of the chunk are removed from the world, plugins can still technically
+access entities inside the chunks. For this, if the serialization task
+fails for any reason, it will be re-scheduled to be serialized on the
+main thread - with the hopes that the reason it failed was due to a plugin
+and not an error with the save code itself. Like vanilla code - if the
+serialization fails, the chunk data is lost.
+
+Asynchronous chunk io/loading
+
+Mojang's current implementation for loading chunk data off disk is
+to return a CompletableFuture that will be completed by scheduling a
+task to be executed on the world's chunk queue (which is only drained
+on the main thread). This task will read the IO off disk and it will
+apply data conversions & deserialization synchronously. Obviously
+all 3 of these operations are expensive however all can be completed
+asynchronously instead.
+
+The solution this patch uses is as follows:
+
+0. If an asynchronous chunk save is in progress (see above), wait
+for that task to complete. It will use the serialized NBTTagCompound
+created by the task. If the task fails to complete, then we would continue
+with step 1. If it does not, we skip step 1. (Note: We actually load
+POI data no matter what in this case).
+1. Schedule an IO task to read chunk & poi data off disk.
+2. The IO task will schedule a chunk load task.
+3. The chunk load task executes on the async chunk loader threads
+and will apply datafixers & de-serialize the chunk into a ProtoChunk
+or ProtoChunkExtension.
+4. The in progress chunk is then passed on to the world's chunk queue
+to complete the ComletableFuture and execute any of the synchronous
+tasks required to be executed by the chunk load task (i.e lighting
+and some poi tasks).
+
+diff --git a/src/main/java/co/aikar/timings/WorldTimingsHandler.java b/src/main/java/co/aikar/timings/WorldTimingsHandler.java
+index 0fda52841b5e1643efeda92106124998abc4e0aa..fe79c0add4f7cb18d487c5bb9415c40c5b551ea2 100644
+--- a/src/main/java/co/aikar/timings/WorldTimingsHandler.java
++++ b/src/main/java/co/aikar/timings/WorldTimingsHandler.java
+@@ -58,6 +58,16 @@ public class WorldTimingsHandler {
+
+ public final Timing miscMobSpawning;
+
++ public final Timing poiUnload;
++ public final Timing chunkUnload;
++ public final Timing poiSaveDataSerialization;
++ public final Timing chunkSave;
++ public final Timing chunkSaveDataSerialization;
++ public final Timing chunkSaveIOWait;
++ public final Timing chunkUnloadPrepareSave;
++ public final Timing chunkUnloadPOISerialization;
++ public final Timing chunkUnloadDataSave;
++
+ public WorldTimingsHandler(Level server) {
+ String name = ((PrimaryLevelData) server.getLevelData()).getLevelName() + " - ";
+
+@@ -111,6 +121,16 @@ public class WorldTimingsHandler {
+
+
+ miscMobSpawning = Timings.ofSafe(name + "Mob spawning - Misc");
++
++ poiUnload = Timings.ofSafe(name + "Chunk unload - POI");
++ chunkUnload = Timings.ofSafe(name + "Chunk unload - Chunk");
++ poiSaveDataSerialization = Timings.ofSafe(name + "Chunk save - POI Data serialization");
++ chunkSave = Timings.ofSafe(name + "Chunk save - Chunk");
++ chunkSaveDataSerialization = Timings.ofSafe(name + "Chunk save - Chunk Data serialization");
++ chunkSaveIOWait = Timings.ofSafe(name + "Chunk save - Chunk IO Wait");
++ chunkUnloadPrepareSave = Timings.ofSafe(name + "Chunk unload - Async Save Prepare");
++ chunkUnloadPOISerialization = Timings.ofSafe(name + "Chunk unload - POI Data Serialization");
++ chunkUnloadDataSave = Timings.ofSafe(name + "Chunk unload - Data Serialization");
+ }
+
+ public static Timing getTickList(ServerLevel worldserver, String timingsType) {
+diff --git a/src/main/java/com/destroystokyo/paper/io/IOUtil.java b/src/main/java/com/destroystokyo/paper/io/IOUtil.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..5af0ac3d9e87c06053e65433060f15779c156c2a
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/IOUtil.java
+@@ -0,0 +1,62 @@
++package com.destroystokyo.paper.io;
++
++import org.bukkit.Bukkit;
++
++public final class IOUtil {
++
++ /* Copied from concrete or concurrentutil */
++
++ public static long getCoordinateKey(final int x, final int z) {
++ return ((long)z << 32) | (x & 0xFFFFFFFFL);
++ }
++
++ public static int getCoordinateX(final long key) {
++ return (int)key;
++ }
++
++ public static int getCoordinateZ(final long key) {
++ return (int)(key >>> 32);
++ }
++
++ public static int getRegionCoordinate(final int chunkCoordinate) {
++ return chunkCoordinate >> 5;
++ }
++
++ public static int getChunkInRegion(final int chunkCoordinate) {
++ return chunkCoordinate & 31;
++ }
++
++ public static String genericToString(final Object object) {
++ return object == null ? "null" : object.getClass().getName() + ":" + object.toString();
++ }
++
++ public static <T> T notNull(final T obj) {
++ if (obj == null) {
++ throw new NullPointerException();
++ }
++ return obj;
++ }
++
++ public static <T> T notNull(final T obj, final String msgIfNull) {
++ if (obj == null) {
++ throw new NullPointerException(msgIfNull);
++ }
++ return obj;
++ }
++
++ public static void arrayBounds(final int off, final int len, final int arrayLength, final String msgPrefix) {
++ if (off < 0 || len < 0 || (arrayLength - off) < len) {
++ throw new ArrayIndexOutOfBoundsException(msgPrefix + ": off: " + off + ", len: " + len + ", array length: " + arrayLength);
++ }
++ }
++
++ public static int getPriorityForCurrentThread() {
++ return Bukkit.isPrimaryThread() ? PrioritizedTaskQueue.HIGHEST_PRIORITY : PrioritizedTaskQueue.NORMAL_PRIORITY;
++ }
++
++ @SuppressWarnings("unchecked")
++ public static <T extends Throwable> void rethrow(final Throwable throwable) throws T {
++ throw (T)throwable;
++ }
++
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..7c89a96d54641904e2d4562fe28c59deecfb5444
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
+@@ -0,0 +1,596 @@
++package com.destroystokyo.paper.io;
++
++import com.mojang.logging.LogUtils;
++import net.minecraft.nbt.CompoundTag;
++import net.minecraft.server.level.ServerLevel;
++import net.minecraft.world.level.ChunkPos;
++import net.minecraft.world.level.chunk.storage.RegionFile;
++import org.slf4j.Logger;
++
++import java.io.IOException;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.atomic.AtomicLong;
++import java.util.function.Consumer;
++import java.util.function.Function;
++
++/**
++ * Prioritized singleton thread responsible for all chunk IO that occurs in a minecraft server.
++ *
++ * <p>
++ * Singleton access: {@link Holder#INSTANCE}
++ * </p>
++ *
++ * <p>
++ * All functions provided are MT-Safe, however certain ordering constraints are (but not enforced):
++ * <li>
++ * Chunk saves may not occur for unloaded chunks.
++ * </li>
++ * <li>
++ * Tasks must be scheduled on the main thread.
++ * </li>
++ * </p>
++ *
++ * @see Holder#INSTANCE
++ * @see #scheduleSave(ServerLevel, int, int, CompoundTag, CompoundTag, int)
++ * @see #loadChunkDataAsync(ServerLevel, int, int, int, Consumer, boolean, boolean, boolean)
++ */
++public final class PaperFileIOThread extends QueueExecutorThread {
++
++ public static final Logger LOGGER = LogUtils.getLogger();
++ public static final CompoundTag FAILURE_VALUE = new CompoundTag();
++
++ public static final class Holder {
++
++ public static final PaperFileIOThread INSTANCE = new PaperFileIOThread();
++
++ static {
++ INSTANCE.start();
++ }
++ }
++
++ private final AtomicLong writeCounter = new AtomicLong();
++
++ private PaperFileIOThread() {
++ super(new PrioritizedTaskQueue<>(), (int)(1.0e6)); // 1.0ms spinwait time
++ this.setName("Paper RegionFile IO Thread");
++ this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us
++ this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> {
++ LOGGER.error("Uncaught exception thrown from IO thread, report this!", thr);
++ });
++ }
++
++ /* run() is implemented by superclass */
++
++ /*
++ *
++ * IO thread will perform reads before writes
++ *
++ * How reads/writes are scheduled:
++ *
++ * If read in progress while scheduling write, ignore read and schedule write
++ * If read in progress while scheduling read (no write in progress), chain the read task
++ *
++ *
++ * If write in progress while scheduling read, use the pending write data and ret immediately
++ * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data
++ *
++ * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however
++ * it fails to properly propagate write failures. When writes fail the data is kept so future reads will actually
++ * read the failed write data. This should hopefully act as a way to prevent data loss for spurious fails for writing data.
++ *
++ */
++
++ /**
++ * Attempts to bump the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued.
++ * @param world Chunk's world
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @param priority Priority level to try to bump to
++ */
++ public void bumpPriority(final ServerLevel world, final int chunkX, final int chunkZ, final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority: " + priority);
++ }
++
++ final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
++
++ final ChunkDataTask poiTask = world.poiDataController.tasks.get(key);
++ final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key);
++
++ if (poiTask != null) {
++ poiTask.raisePriority(priority);
++ }
++ if (chunkTask != null) {
++ chunkTask.raisePriority(priority);
++ }
++ }
++
++ public CompoundTag getPendingWrite(final ServerLevel world, final int chunkX, final int chunkZ, final boolean poiData) {
++ final ChunkDataController taskController = poiData ? world.poiDataController : world.chunkDataController;
++
++ final ChunkDataTask dataTask = taskController.tasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)));
++
++ if (dataTask == null) {
++ return null;
++ }
++
++ final ChunkDataController.InProgressWrite write = dataTask.inProgressWrite;
++
++ if (write == null) {
++ return null;
++ }
++
++ return write.data;
++ }
++
++ /**
++ * Sets the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued.
++ * @param world Chunk's world
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @param priority Priority level to set to
++ */
++ public void setPriority(final ServerLevel world, final int chunkX, final int chunkZ, final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority: " + priority);
++ }
++
++ final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
++
++ final ChunkDataTask poiTask = world.poiDataController.tasks.get(key);
++ final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key);
++
++ if (poiTask != null) {
++ poiTask.updatePriority(priority);
++ }
++ if (chunkTask != null) {
++ chunkTask.updatePriority(priority);
++ }
++ }
++
++ /**
++ * Schedules the chunk data to be written asynchronously.
++ * <p>
++ * Impl notes:
++ * </p>
++ * <li>
++ * This function presumes a chunk load for the coordinates is not called during this function (anytime after is OK). This means
++ * saves must be scheduled before a chunk is unloaded.
++ * </li>
++ * <li>
++ * Writes may be called concurrently, although only the "later" write will go through.
++ * </li>
++ * @param world Chunk's world
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @param poiData Chunk point of interest data. If {@code null}, then no poi data is saved.
++ * @param chunkData Chunk data. If {@code null}, then no chunk data is saved.
++ * @param priority Priority level for this task. See {@link PrioritizedTaskQueue}
++ * @throws IllegalArgumentException If both {@code poiData} and {@code chunkData} are {@code null}.
++ * @throws IllegalStateException If the file io thread has shutdown.
++ */
++ public void scheduleSave(final ServerLevel world, final int chunkX, final int chunkZ,
++ final CompoundTag poiData, final CompoundTag chunkData,
++ final int priority) throws IllegalArgumentException {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority: " + priority);
++ }
++
++ final long writeCounter = this.writeCounter.getAndIncrement();
++
++ if (poiData != null) {
++ this.scheduleWrite(world.poiDataController, world, chunkX, chunkZ, poiData, priority, writeCounter);
++ }
++ if (chunkData != null) {
++ this.scheduleWrite(world.chunkDataController, world, chunkX, chunkZ, chunkData, priority, writeCounter);
++ }
++ }
++
++ private void scheduleWrite(final ChunkDataController dataController, final ServerLevel world,
++ final int chunkX, final int chunkZ, final CompoundTag data, final int priority, final long writeCounter) {
++ dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask taskRunning) -> {
++ if (taskRunning == null) {
++ // no task is scheduled
++
++ // create task
++ final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController);
++ newTask.inProgressWrite = new ChunkDataController.InProgressWrite();
++ newTask.inProgressWrite.writeCounter = writeCounter;
++ newTask.inProgressWrite.data = data;
++
++ PaperFileIOThread.this.queueTask(newTask); // schedule
++ return newTask;
++ }
++
++ taskRunning.raisePriority(priority);
++
++ if (taskRunning.inProgressWrite == null) {
++ taskRunning.inProgressWrite = new ChunkDataController.InProgressWrite();
++ }
++
++ boolean reschedule = taskRunning.inProgressWrite.writeCounter == -1L;
++
++ // synchronize for readers
++ //noinspection SynchronizationOnLocalVariableOrMethodParameter
++ synchronized (taskRunning) {
++ taskRunning.inProgressWrite.data = data;
++ taskRunning.inProgressWrite.writeCounter = writeCounter;
++ }
++
++ if (reschedule) {
++ // We need to reschedule this task since the previous one is not currently scheduled since it failed
++ taskRunning.reschedule(priority);
++ }
++
++ return taskRunning;
++ });
++ }
++
++ /**
++ * Same as {@link #loadChunkDataAsync(ServerLevel, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns
++ * a {@link CompletableFuture} which is potentially completed <b>ASYNCHRONOUSLY ON THE FILE IO THREAD</b> when the load task
++ * has completed.
++ * <p>
++ * Note that if the chunk fails to load the returned future is completed with {@code null}.
++ * </p>
++ */
++ public CompletableFuture<ChunkData> loadChunkDataAsyncFuture(final ServerLevel world, final int chunkX, final int chunkZ,
++ final int priority, final boolean readPoiData, final boolean readChunkData,
++ final boolean intendingToBlock) {
++ final CompletableFuture<ChunkData> future = new CompletableFuture<>();
++ this.loadChunkDataAsync(world, chunkX, chunkZ, priority, future::complete, readPoiData, readChunkData, intendingToBlock);
++ return future;
++ }
++
++ /**
++ * Schedules a load to be executed asynchronously.
++ * <p>
++ * Impl notes:
++ * </p>
++ * <li>
++ * If a chunk fails to load, the {@code onComplete} parameter is completed with {@code null}.
++ * </li>
++ * <li>
++ * It is possible for the {@code onComplete} parameter to be given {@link ChunkData} containing data
++ * this call did not request.
++ * </li>
++ * <li>
++ * The {@code onComplete} parameter may be completed during the execution of this function synchronously or it may
++ * be completed asynchronously on this file io thread. Interacting with the file IO thread in the completion of
++ * data is undefined behaviour, and can cause deadlock.
++ * </li>
++ * @param world Chunk's world
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @param priority Priority level for this task. See {@link PrioritizedTaskQueue}
++ * @param onComplete Consumer to execute once this task has completed
++ * @param readPoiData Whether to read point of interest data. If {@code false}, the {@code NBTTagCompound} will be {@code null}.
++ * @param readChunkData Whether to read chunk data. If {@code false}, the {@code NBTTagCompound} will be {@code null}.
++ * @return The {@link PrioritizedTaskQueue.PrioritizedTask} associated with this task. Note that this task does not support
++ * cancellation.
++ */
++ public void loadChunkDataAsync(final ServerLevel world, final int chunkX, final int chunkZ,
++ final int priority, final Consumer<ChunkData> onComplete,
++ final boolean readPoiData, final boolean readChunkData,
++ final boolean intendingToBlock) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority: " + priority);
++ }
++
++ if (!(readPoiData | readChunkData)) {
++ throw new IllegalArgumentException("Must read chunk data or poi data");
++ }
++
++ final ChunkData complete = new ChunkData();
++ final boolean[] requireCompletion = new boolean[] { readPoiData, readChunkData };
++
++ if (readPoiData) {
++ this.scheduleRead(world.poiDataController, world, chunkX, chunkZ, (final CompoundTag poiData) -> {
++ complete.poiData = poiData;
++
++ final boolean finished;
++
++ // avoid a race condition where the file io thread completes and we complete synchronously
++ // Note: Synchronization can be elided if both of the accesses are volatile
++ synchronized (requireCompletion) {
++ requireCompletion[0] = false; // 0 -> poi data
++ finished = !requireCompletion[1]; // 1 -> chunk data
++ }
++
++ if (finished) {
++ onComplete.accept(complete);
++ }
++ }, priority, intendingToBlock);
++ }
++
++ if (readChunkData) {
++ this.scheduleRead(world.chunkDataController, world, chunkX, chunkZ, (final CompoundTag chunkData) -> {
++ complete.chunkData = chunkData;
++
++ final boolean finished;
++
++ // avoid a race condition where the file io thread completes and we complete synchronously
++ // Note: Synchronization can be elided if both of the accesses are volatile
++ synchronized (requireCompletion) {
++ requireCompletion[1] = false; // 1 -> chunk data
++ finished = !requireCompletion[0]; // 0 -> poi data
++ }
++
++ if (finished) {
++ onComplete.accept(complete);
++ }
++ }, priority, intendingToBlock);
++ }
++
++ }
++
++ // Note: the onComplete may be called asynchronously or synchronously here.
++ private void scheduleRead(final ChunkDataController dataController, final ServerLevel world,
++ final int chunkX, final int chunkZ, final Consumer<CompoundTag> onComplete, final int priority,
++ final boolean intendingToBlock) {
++
++ Function<RegionFile, Boolean> tryLoadFunction = (final RegionFile file) -> {
++ if (file == null) {
++ return Boolean.TRUE;
++ }
++ return Boolean.valueOf(file.hasChunk(new ChunkPos(chunkX, chunkZ)));
++ };
++
++ dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask running) -> {
++ if (running == null) {
++ // not scheduled
++
++ final Boolean shouldSchedule = intendingToBlock ? dataController.computeForRegionFile(chunkX, chunkZ, tryLoadFunction) :
++ dataController.computeForRegionFileIfLoaded(chunkX, chunkZ, tryLoadFunction);
++
++ if (shouldSchedule == Boolean.FALSE) {
++ // not on disk
++ onComplete.accept(null);
++ return null;
++ }
++
++ // set up task
++ final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController);
++ newTask.inProgressRead = new ChunkDataController.InProgressRead();
++ newTask.inProgressRead.readFuture.thenAccept(onComplete);
++
++ PaperFileIOThread.this.queueTask(newTask); // schedule task
++ return newTask;
++ }
++
++ running.raisePriority(priority);
++
++ if (running.inProgressWrite == null) {
++ // chain to the read future
++ running.inProgressRead.readFuture.thenAccept(onComplete);
++ return running;
++ }
++
++ // at this stage we have to use the in progress write's data to avoid an order issue
++ // we don't synchronize since all writes to data occur in the compute() call
++ onComplete.accept(running.inProgressWrite.data);
++ return running;
++ });
++ }
++
++ /**
++ * Same as {@link #loadChunkDataAsync(ServerLevel, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns
++ * the {@link ChunkData} associated with the specified chunk when the task is complete.
++ * @return The chunk data, or {@code null} if the chunk failed to load.
++ */
++ public ChunkData loadChunkData(final ServerLevel world, final int chunkX, final int chunkZ, final int priority,
++ final boolean readPoiData, final boolean readChunkData) {
++ return this.loadChunkDataAsyncFuture(world, chunkX, chunkZ, priority, readPoiData, readChunkData, true).join();
++ }
++
++ /**
++ * Schedules the given task at the specified priority to be executed on the IO thread.
++ * <p>
++ * Internal api. Do not use.
++ * </p>
++ */
++ public void runTask(final int priority, final Runnable runnable) {
++ this.queueTask(new GeneralTask(priority, runnable));
++ }
++
++ static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
++
++ private final Runnable run;
++
++ public GeneralTask(final int priority, final Runnable run) {
++ super(priority);
++ this.run = IOUtil.notNull(run, "Task may not be null");
++ }
++
++ @Override
++ public void run() {
++ try {
++ this.run.run();
++ } catch (final Throwable throwable) {
++ if (throwable instanceof ThreadDeath) {
++ throw (ThreadDeath)throwable;
++ }
++ LOGGER.error("Failed to execute general task on IO thread " + IOUtil.genericToString(this.run), throwable);
++ }
++ }
++ }
++
++ public static final class ChunkData {
++
++ public CompoundTag poiData;
++ public CompoundTag chunkData;
++
++ public ChunkData() {}
++
++ public ChunkData(final CompoundTag poiData, final CompoundTag chunkData) {
++ this.poiData = poiData;
++ this.chunkData = chunkData;
++ }
++ }
++
++ public static abstract class ChunkDataController {
++
++ // ConcurrentHashMap synchronizes per chain, so reduce the chance of task's hashes colliding.
++ public final ConcurrentHashMap<Long, ChunkDataTask> tasks = new ConcurrentHashMap<>(64, 0.5f);
++
++ public abstract void writeData(final int x, final int z, final CompoundTag compound) throws IOException;
++ public abstract CompoundTag readData(final int x, final int z) throws IOException;
++
++ public abstract <T> T computeForRegionFile(final int chunkX, final int chunkZ, final Function<RegionFile, T> function);
++ public abstract <T> T computeForRegionFileIfLoaded(final int chunkX, final int chunkZ, final Function<RegionFile, T> function);
++
++ public static final class InProgressWrite {
++ public long writeCounter;
++ public CompoundTag data;
++ }
++
++ public static final class InProgressRead {
++ public final CompletableFuture<CompoundTag> readFuture = new CompletableFuture<>();
++ }
++ }
++
++ public static final class ChunkDataTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
++
++ public ChunkDataController.InProgressWrite inProgressWrite;
++ public ChunkDataController.InProgressRead inProgressRead;
++
++ private final ServerLevel world;
++ private final int x;
++ private final int z;
++ private final ChunkDataController taskController;
++
++ public ChunkDataTask(final int priority, final ServerLevel world, final int x, final int z, final ChunkDataController taskController) {
++ super(priority);
++ this.world = world;
++ this.x = x;
++ this.z = z;
++ this.taskController = taskController;
++ }
++
++ @Override
++ public String toString() {
++ return "Task for world: '" + this.world.getWorld().getName() + "' at " + this.x + "," + this.z +
++ " poi: " + (this.taskController == this.world.poiDataController) + ", hash: " + this.hashCode();
++ }
++
++ /*
++ *
++ * IO thread will perform reads before writes
++ *
++ * How reads/writes are scheduled:
++ *
++ * If read in progress while scheduling write, ignore read and schedule write
++ * If read in progress while scheduling read (no write in progress), chain the read task
++ *
++ *
++ * If write in progress while scheduling read, use the pending write data and ret immediately
++ * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data
++ *
++ * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however
++ * it fails to properly propagate write failures
++ *
++ */
++
++ void reschedule(final int priority) {
++ // priority is checked before this stage // TODO what
++ this.queue.lazySet(null);
++ this.priority.lazySet(priority);
++ PaperFileIOThread.Holder.INSTANCE.queueTask(this);
++ }
++
++ @Override
++ public void run() {
++ ChunkDataController.InProgressRead read = this.inProgressRead;
++ if (read != null) {
++ CompoundTag compound = PaperFileIOThread.FAILURE_VALUE;
++ try {
++ compound = this.taskController.readData(this.x, this.z);
++ } catch (final Throwable thr) {
++ if (thr instanceof ThreadDeath) {
++ throw (ThreadDeath)thr;
++ }
++ LOGGER.error("Failed to read chunk data for task: " + this.toString(), thr);
++ // fall through to complete with null data
++ }
++ read.readFuture.complete(compound);
++ }
++
++ final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(this.x, this.z));
++
++ ChunkDataController.InProgressWrite write = this.inProgressWrite;
++
++ if (write == null) {
++ // IntelliJ warns this is invalid, however it does not consider that writes to the task map & the inProgress field can occur concurrently.
++ ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> {
++ if (valueInMap == null) {
++ throw new IllegalStateException("Write completed concurrently, expected this task: " + ChunkDataTask.this.toString() + ", report this!");
++ }
++ if (valueInMap != ChunkDataTask.this) {
++ throw new IllegalStateException("Chunk task mismatch, expected this task: " + ChunkDataTask.this.toString() + ", got: " + valueInMap.toString() + ", report this!");
++ }
++ return valueInMap.inProgressWrite == null ? null : valueInMap;
++ });
++
++ if (inMap == null) {
++ return; // set the task value to null, indicating we're done
++ }
++
++ // not null, which means there was a concurrent write
++ write = this.inProgressWrite;
++ }
++
++ for (;;) {
++ final long writeCounter;
++ final CompoundTag data;
++
++ //noinspection SynchronizationOnLocalVariableOrMethodParameter
++ synchronized (write) {
++ writeCounter = write.writeCounter;
++ data = write.data;
++ }
++
++ boolean failedWrite = false;
++
++ try {
++ this.taskController.writeData(this.x, this.z, data);
++ } catch (final Throwable thr) {
++ if (thr instanceof ThreadDeath) {
++ throw (ThreadDeath)thr;
++ }
++ LOGGER.error("Failed to write chunk data for task: " + this.toString(), thr);
++ failedWrite = true;
++ }
++
++ boolean finalFailWrite = failedWrite;
++
++ ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> {
++ if (valueInMap == null) {
++ throw new IllegalStateException("Write completed concurrently, expected this task: " + ChunkDataTask.this.toString() + ", report this!");
++ }
++ if (valueInMap != ChunkDataTask.this) {
++ throw new IllegalStateException("Chunk task mismatch, expected this task: " + ChunkDataTask.this.toString() + ", got: " + valueInMap.toString() + ", report this!");
++ }
++ if (valueInMap.inProgressWrite.writeCounter == writeCounter) {
++ if (finalFailWrite) {
++ valueInMap.inProgressWrite.writeCounter = -1L;
++ }
++
++ return null;
++ }
++ return valueInMap;
++ // Hack end
++ });
++
++ if (inMap == null) {
++ // write counter matched, so we wrote the most up-to-date pending data, we're done here
++ // or we failed to write and successfully set the write counter to -1
++ return; // we're done here
++ }
++
++ // fetch & write new data
++ continue;
++ }
++ }
++ }
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..24fe40c14cc50f8357a9c7a7493140fdea016a3d
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
+@@ -0,0 +1,298 @@
++package com.destroystokyo.paper.io;
++
++import java.util.concurrent.ConcurrentLinkedQueue;
++import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicReference;
++
++public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask> {
++
++ // lower numbers are a higher priority (except < 0)
++ // higher priorities are always executed before lower priorities
++
++ /**
++ * Priority value indicating the task has completed or is being completed.
++ */
++ public static final int COMPLETING_PRIORITY = -1;
++
++ /**
++ * Highest priority, should only be used for main thread tasks or tasks that are blocking the main thread.
++ */
++ public static final int HIGHEST_PRIORITY = 0;
++
++ /**
++ * Should be only used in an IO task so that chunk loads do not wait on other IO tasks.
++ * This only exists because IO tasks are scheduled before chunk load tasks to decrease IO waiting times.
++ */
++ public static final int HIGHER_PRIORITY = 1;
++
++ /**
++ * Should be used for scheduling chunk loads/generation that would increase response times to users.
++ */
++ public static final int HIGH_PRIORITY = 2;
++
++ /**
++ * Default priority.
++ */
++ public static final int NORMAL_PRIORITY = 3;
++
++ /**
++ * Use for tasks not at all critical and can potentially be delayed.
++ */
++ public static final int LOW_PRIORITY = 4;
++
++ /**
++ * Use for tasks that should "eventually" execute.
++ */
++ public static final int LOWEST_PRIORITY = 5;
++
++ private static final int TOTAL_PRIORITIES = 6;
++
++ final ConcurrentLinkedQueue<T>[] queues = (ConcurrentLinkedQueue<T>[])new ConcurrentLinkedQueue[TOTAL_PRIORITIES];
++
++ private final AtomicBoolean shutdown = new AtomicBoolean();
++
++ {
++ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
++ this.queues[i] = new ConcurrentLinkedQueue<>();
++ }
++ }
++
++ /**
++ * Returns whether the specified priority is valid
++ */
++ public static boolean validPriority(final int priority) {
++ return priority >= 0 && priority < TOTAL_PRIORITIES;
++ }
++
++ /**
++ * Queues a task.
++ * @throws IllegalStateException If the task has already been queued. Use {@link PrioritizedTask#raisePriority(int)} to
++ * raise a task's priority.
++ * This can also be thrown if the queue has shutdown.
++ */
++ public void add(final T task) throws IllegalStateException {
++ int priority = task.getPriority();
++ if (priority != COMPLETING_PRIORITY) {
++ task.setQueue(this);
++ this.queues[priority].add(task);
++ }
++ if (this.shutdown.get()) {
++ // note: we're not actually sure at this point if our task will go through
++ throw new IllegalStateException("Queue has shutdown, refusing to execute task " + IOUtil.genericToString(task));
++ }
++ }
++
++ /**
++ * Polls the highest priority task currently available. {@code null} if none.
++ */
++ public T poll() {
++ T task;
++ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
++ final ConcurrentLinkedQueue<T> queue = this.queues[i];
++
++ while ((task = queue.poll()) != null) {
++ final int prevPriority = task.tryComplete(i);
++ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) {
++ // if the prev priority was greater-than or equal to our current priority
++ return task;
++ }
++ }
++ }
++
++ return null;
++ }
++
++ /**
++ * Polls the highest priority task currently available. {@code null} if none.
++ */
++ public T poll(final int lowestPriority) {
++ T task;
++ final int max = Math.min(LOWEST_PRIORITY, lowestPriority);
++ for (int i = 0; i <= max; ++i) {
++ final ConcurrentLinkedQueue<T> queue = this.queues[i];
++
++ while ((task = queue.poll()) != null) {
++ final int prevPriority = task.tryComplete(i);
++ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) {
++ // if the prev priority was greater-than or equal to our current priority
++ return task;
++ }
++ }
++ }
++
++ return null;
++ }
++
++ /**
++ * Returns whether this queue may have tasks queued.
++ * <p>
++ * This operation is not atomic, but is MT-Safe.
++ * </p>
++ * @return {@code true} if tasks may be queued, {@code false} otherwise
++ */
++ public boolean hasTasks() {
++ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
++ final ConcurrentLinkedQueue<T> queue = this.queues[i];
++
++ if (queue.peek() != null) {
++ return true;
++ }
++ }
++ return false;
++ }
++
++ /**
++ * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will
++ * result in {@link IllegalStateException} being thrown.
++ * <p>
++ * This operation is atomic with respect to other shutdown calls
++ * </p>
++ * <p>
++ * After this call has completed, regardless of return value, this queue will be shutdown.
++ * </p>
++ * @return {@code true} if the queue was shutdown, {@code false} if it has shut down already
++ */
++ public boolean shutdown() {
++ return this.shutdown.getAndSet(false);
++ }
++
++ public abstract static class PrioritizedTask {
++
++ protected final AtomicReference<PrioritizedTaskQueue> queue = new AtomicReference<>();
++
++ protected final AtomicInteger priority;
++
++ protected PrioritizedTask() {
++ this(PrioritizedTaskQueue.NORMAL_PRIORITY);
++ }
++
++ protected PrioritizedTask(final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority " + priority);
++ }
++ this.priority = new AtomicInteger(priority);
++ }
++
++ /**
++ * Returns the current priority. Note that {@link PrioritizedTaskQueue#COMPLETING_PRIORITY} will be returned
++ * if this task is completing or has completed.
++ */
++ public final int getPriority() {
++ return this.priority.get();
++ }
++
++ /**
++ * Returns whether this task is scheduled to execute, or has been already executed.
++ */
++ public boolean isScheduled() {
++ return this.queue.get() != null;
++ }
++
++ final int tryComplete(final int minPriority) {
++ for (int curr = this.getPriorityVolatile();;) {
++ if (curr == COMPLETING_PRIORITY) {
++ return COMPLETING_PRIORITY;
++ }
++ if (curr > minPriority) {
++ // curr is lower priority
++ return curr;
++ }
++
++ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, COMPLETING_PRIORITY))) {
++ return curr;
++ }
++ continue;
++ }
++ }
++
++ /**
++ * Forces this task to be completed.
++ * @return {@code true} if the task was cancelled, {@code false} if the task has already completed or is being completed.
++ */
++ public boolean cancel() {
++ return this.exchangePriorityVolatile(PrioritizedTaskQueue.COMPLETING_PRIORITY) != PrioritizedTaskQueue.COMPLETING_PRIORITY;
++ }
++
++ /**
++ * Attempts to raise the priority to the priority level specified.
++ * @param priority Priority specified
++ * @return {@code true} if successful, {@code false} otherwise.
++ */
++ public boolean raisePriority(final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority");
++ }
++
++ for (int curr = this.getPriorityVolatile();;) {
++ if (curr == COMPLETING_PRIORITY) {
++ return false;
++ }
++ if (priority >= curr) {
++ return true;
++ }
++
++ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) {
++ PrioritizedTaskQueue queue = this.queue.get();
++ if (queue != null) {
++ //noinspection unchecked
++ queue.queues[priority].add(this); // silently fail on shutdown
++ }
++ return true;
++ }
++ continue;
++ }
++ }
++
++ /**
++ * Attempts to set this task's priority level to the level specified.
++ * @param priority Specified priority level.
++ * @return {@code true} if successful, {@code false} if this task is completing or has completed.
++ */
++ public boolean updatePriority(final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalArgumentException("Invalid priority");
++ }
++
++ for (int curr = this.getPriorityVolatile();;) {
++ if (curr == COMPLETING_PRIORITY) {
++ return false;
++ }
++ if (curr == priority) {
++ return true;
++ }
++
++ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) {
++ PrioritizedTaskQueue queue = this.queue.get();
++ if (queue != null) {
++ //noinspection unchecked
++ queue.queues[priority].add(this); // silently fail on shutdown
++ }
++ return true;
++ }
++ continue;
++ }
++ }
++
++ void setQueue(final PrioritizedTaskQueue queue) {
++ this.queue.set(queue);
++ }
++
++ /* priority */
++
++ protected final int getPriorityVolatile() {
++ return this.priority.get();
++ }
++
++ protected final int compareAndExchangePriorityVolatile(final int expect, final int update) {
++ if (this.priority.compareAndSet(expect, update)) {
++ return expect;
++ }
++ return this.priority.get();
++ }
++
++ protected final int exchangePriorityVolatile(final int value) {
++ return this.priority.getAndSet(value);
++ }
++ }
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..f1b940704400266e6df186139b57ec72ae314448
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
+@@ -0,0 +1,254 @@
++package com.destroystokyo.paper.io;
++
++import com.mojang.logging.LogUtils;
++import org.slf4j.Logger;
++
++import java.util.concurrent.ConcurrentLinkedQueue;
++import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.locks.LockSupport;
++
++public class QueueExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask & Runnable> extends Thread {
++
++ private static final Logger LOGGER = LogUtils.getLogger();
++
++ protected final PrioritizedTaskQueue<T> queue;
++ protected final long spinWaitTime;
++
++ protected volatile boolean closed;
++
++ protected final AtomicBoolean parked = new AtomicBoolean();
++
++ protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
++ protected volatile long flushCycles;
++
++ protected int lowestPriorityToPoll = PrioritizedTaskQueue.LOWEST_PRIORITY;
++
++ public int getLowestPriorityToPoll() {
++ return this.lowestPriorityToPoll;
++ }
++
++ public void setLowestPriorityToPoll(final int lowestPriorityToPoll) {
++ if (this.isAlive()) {
++ throw new IllegalStateException("Cannot set after starting");
++ }
++ this.lowestPriorityToPoll = lowestPriorityToPoll;
++ }
++
++ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
++ this(queue, (int)(1.e6)); // 1.0ms
++ }
++
++ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue, final long spinWaitTime) { // in ms
++ this.queue = queue;
++ this.spinWaitTime = spinWaitTime;
++ }
++
++ @Override
++ public void run() {
++ final long spinWaitTime = this.spinWaitTime;
++ main_loop:
++ for (;;) {
++ this.pollTasks(true);
++
++ // spinwait
++
++ final long start = System.nanoTime();
++
++ for (;;) {
++ // If we are interrpted for any reason, park() will always return immediately. Clear so that we don't needlessly use cpu in such an event.
++ Thread.interrupted();
++ LockSupport.parkNanos("Spinwaiting on tasks", 1000L); // 1us
++
++ if (this.pollTasks(true)) {
++ // restart loop, found tasks
++ continue main_loop;
++ }
++
++ if (this.handleClose()) {
++ return; // we're done
++ }
++
++ if ((System.nanoTime() - start) >= spinWaitTime) {
++ break;
++ }
++ }
++
++ if (this.handleClose()) {
++ return;
++ }
++
++ this.parked.set(true);
++
++ // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true
++ // (i.e it will not notify us)
++ if (this.pollTasks(true)) {
++ this.parked.set(false);
++ continue;
++ }
++
++ if (this.handleClose()) {
++ return;
++ }
++
++ // we don't need to check parked before sleeping, but we do need to check parked in a do-while loop
++ // LockSupport.park() can fail for any reason
++ do {
++ Thread.interrupted();
++ LockSupport.park("Waiting on tasks");
++ } while (this.parked.get());
++ }
++ }
++
++ protected boolean handleClose() {
++ if (this.closed) {
++ this.pollTasks(true); // this ensures we've emptied the queue
++ this.handleFlushThreads(true);
++ return true;
++ }
++ return false;
++ }
++
++ protected boolean pollTasks(boolean flushTasks) {
++ Runnable task;
++ boolean ret = false;
++
++ while ((task = this.queue.poll(this.lowestPriorityToPoll)) != null) {
++ ret = true;
++ try {
++ task.run();
++ } catch (final Throwable throwable) {
++ if (throwable instanceof ThreadDeath) {
++ throw (ThreadDeath)throwable;
++ }
++ LOGGER.error("Exception thrown from prioritized runnable task in thread '" + this.getName() + "': " + IOUtil.genericToString(task), throwable);
++ }
++ }
++
++ if (flushTasks) {
++ this.handleFlushThreads(false);
++ }
++
++ return ret;
++ }
++
++ protected void handleFlushThreads(final boolean shutdown) {
++ Thread parking;
++ ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
++ do {
++ ++flushCycles; // may be plain read opaque write
++ while ((parking = flushQueue.poll()) != null) {
++ LockSupport.unpark(parking);
++ }
++ } while (this.pollTasks(false));
++
++ if (shutdown) {
++ this.flushQueue = null;
++
++ // defend against a race condition where a flush thread double-checks right before we set to null
++ while ((parking = flushQueue.poll()) != null) {
++ LockSupport.unpark(parking);
++ }
++ }
++ }
++
++ /**
++ * Notify's this thread that a task has been added to its queue
++ * @return {@code true} if this thread was waiting for tasks, {@code false} if it is executing tasks
++ */
++ public boolean notifyTasks() {
++ if (this.parked.get() && this.parked.getAndSet(false)) {
++ LockSupport.unpark(this);
++ return true;
++ }
++ return false;
++ }
++
++ protected void queueTask(final T task) {
++ this.queue.add(task);
++ this.notifyTasks();
++ }
++
++ /**
++ * Waits until this thread's queue is empty.
++ *
++ * @throws IllegalStateException If the current thread is {@code this} thread.
++ */
++ public void flush() {
++ final Thread currentThread = Thread.currentThread();
++
++ if (currentThread == this) {
++ // avoid deadlock
++ throw new IllegalStateException("Cannot flush the queue executor thread while on the queue executor thread");
++ }
++
++ // order is important
++
++ int successes = 0;
++ long lastCycle = -1L;
++
++ do {
++ final ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
++ if (flushQueue == null) {
++ return;
++ }
++
++ flushQueue.add(currentThread);
++
++ // double check flush queue
++ if (this.flushQueue == null) {
++ return;
++ }
++
++ final long currentCycle = this.flushCycles; // may be opaque read
++
++ if (currentCycle == lastCycle) {
++ Thread.yield();
++ continue;
++ }
++
++ // force response
++ this.parked.set(false);
++ LockSupport.unpark(this);
++
++ LockSupport.park("flushing queue executor thread");
++
++ // returns whether there are tasks queued, does not return whether there are tasks executing
++ // this is why we cycle twice twice through flush (we know a pollTask call is made after a flush cycle)
++ // we really only need to guarantee that the tasks this thread has queued has gone through, and can leave
++ // tasks queued concurrently that are unsychronized with this thread as undefined behavior
++ if (this.queue.hasTasks()) {
++ successes = 0;
++ } else {
++ ++successes;
++ }
++
++ } while (successes != 2);
++
++ }
++
++ /**
++ * Closes this queue executor's queue and optionally waits for it to empty.
++ * <p>
++ * If wait is {@code true}, then the queue will be empty by the time this call completes.
++ * </p>
++ * <p>
++ * This function is MT-Safe.
++ * </p>
++ * @param wait If this call is to wait until the queue is empty
++ * @param killQueue Whether to shutdown this thread's queue
++ * @return whether this thread shut down the queue
++ */
++ public boolean close(final boolean wait, final boolean killQueue) {
++ boolean ret = !killQueue ? false : this.queue.shutdown();
++ this.closed = true;
++
++ // force thread to respond to the shutdown
++ this.parked.set(false);
++ LockSupport.unpark(this);
++
++ if (wait) {
++ this.flush();
++ }
++ return ret;
++ }
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..e9070b6158e7e8c2dd33a9dcb20898a2f0d86e48
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java
+@@ -0,0 +1,148 @@
++package com.destroystokyo.paper.io.chunk;
++
++import co.aikar.timings.Timing;
++import com.destroystokyo.paper.io.PaperFileIOThread;
++import com.destroystokyo.paper.io.IOUtil;
++import java.util.ArrayDeque;
++import java.util.function.Consumer;
++import com.mojang.logging.LogUtils;
++import net.minecraft.server.level.ChunkMap;
++import net.minecraft.server.level.ServerLevel;
++import net.minecraft.world.level.ChunkPos;
++import net.minecraft.world.level.chunk.storage.ChunkSerializer;
++import org.slf4j.Logger;
++
++public final class ChunkLoadTask extends ChunkTask {
++
++ private static final Logger LOGGER = LogUtils.getLogger();
++
++ public boolean cancelled;
++
++ Consumer<ChunkSerializer.InProgressChunkHolder> onComplete;
++ public PaperFileIOThread.ChunkData chunkData;
++
++ private boolean hasCompleted;
++
++ public ChunkLoadTask(final ServerLevel world, final int chunkX, final int chunkZ, final int priority,
++ final ChunkTaskManager taskManager,
++ final Consumer<ChunkSerializer.InProgressChunkHolder> onComplete) {
++ super(world, chunkX, chunkZ, priority, taskManager);
++ this.onComplete = onComplete;
++ }
++
++ private static final ArrayDeque<Runnable> EMPTY_QUEUE = new ArrayDeque<>();
++
++ private static ChunkSerializer.InProgressChunkHolder createEmptyHolder() {
++ return new ChunkSerializer.InProgressChunkHolder(null, EMPTY_QUEUE);
++ }
++
++ @Override
++ public void run() {
++ try {
++ this.executeTask();
++ } catch (final Throwable ex) {
++ LOGGER.error("Failed to execute chunk load task: " + this.toString(), ex);
++ if (!this.hasCompleted) {
++ this.complete(ChunkLoadTask.createEmptyHolder());
++ }
++ }
++ }
++
++ private boolean checkCancelled() {
++ if (this.cancelled) {
++ // IntelliJ does not understand writes may occur to cancelled concurrently.
++ return this.taskManager.chunkLoadTasks.compute(Long.valueOf(IOUtil.getCoordinateKey(this.chunkX, this.chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
++ if (valueInMap != ChunkLoadTask.this) {
++ throw new IllegalStateException("Expected this task to be scheduled, but another was! Other: " + valueInMap + ", current: " + ChunkLoadTask.this);
++ }
++
++ if (valueInMap.cancelled) {
++ return null;
++ }
++ return valueInMap;
++ }) == null;
++ }
++ return false;
++ }
++
++ public void executeTask() {
++ if (this.checkCancelled()) {
++ return;
++ }
++
++ // either executed synchronously or asynchronously
++ final PaperFileIOThread.ChunkData chunkData = this.chunkData;
++
++ if (chunkData.poiData == PaperFileIOThread.FAILURE_VALUE || chunkData.chunkData == PaperFileIOThread.FAILURE_VALUE) {
++ LOGGER.error("Could not load chunk for task: " + this.toString() + ", file IO thread has dumped the relevant exception above");
++ this.complete(ChunkLoadTask.createEmptyHolder());
++ return;
++ }
++
++ if (chunkData.chunkData == null) {
++ // not on disk
++ this.complete(ChunkLoadTask.createEmptyHolder());
++ return;
++ }
++
++ final ChunkPos chunkPos = new ChunkPos(this.chunkX, this.chunkZ);
++
++ final ChunkMap chunkManager = this.world.getChunkSource().chunkMap;
++
++ try (Timing ignored = this.world.timings.chunkLoadLevelTimer.startTimingIfSync()) {
++ final ChunkSerializer.InProgressChunkHolder chunkHolder;
++
++ // apply fixes
++
++ try {
++ chunkData.chunkData = chunkManager.upgradeChunkTag(this.world.getTypeKey(),
++ chunkManager.overworldDataStorage, chunkData.chunkData, chunkManager.generator.getTypeNameForDataFixer(), chunkPos, this.world); // clone data for safety, file IO thread does not clone
++ } catch (final Throwable ex) {
++ LOGGER.error("Could not apply datafixers for chunk task: " + this.toString(), ex);
++ this.complete(ChunkLoadTask.createEmptyHolder());
++ return;
++ }
++
++ if (!ChunkMap.isChunkDataValid(chunkData.chunkData)) {
++ LOGGER.error("Chunk file at {} is missing level data, skipping", new ChunkPos(this.chunkX, this.chunkZ));
++ this.complete(ChunkLoadTask.createEmptyHolder());
++ return;
++ }
++
++ if (this.checkCancelled()) {
++ return;
++ }
++
++ try {
++ chunkHolder = ChunkSerializer.loadChunk(this.world, chunkManager.getPoiManager(), chunkPos,
++ chunkData.chunkData, true);
++ } catch (final Throwable ex) {
++ LOGGER.error("Could not de-serialize chunk data for task: " + this.toString(), ex);
++ this.complete(ChunkLoadTask.createEmptyHolder());
++ return;
++ }
++
++ this.complete(chunkHolder);
++ }
++ }
++
++ private void complete(final ChunkSerializer.InProgressChunkHolder holder) {
++ this.hasCompleted = true;
++ holder.poiData = this.chunkData == null ? null : this.chunkData.poiData;
++
++ this.taskManager.chunkLoadTasks.compute(Long.valueOf(IOUtil.getCoordinateKey(this.chunkX, this.chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
++ if (valueInMap != ChunkLoadTask.this) {
++ throw new IllegalStateException("Expected this task to be scheduled, but another was! Other: " + valueInMap + ", current: " + ChunkLoadTask.this);
++ }
++ if (valueInMap.cancelled) {
++ return null;
++ }
++ try {
++ ChunkLoadTask.this.onComplete.accept(holder);
++ } catch (final Throwable thr) {
++ LOGGER.error("Failed to complete chunk data for task: " + this.toString(), thr);
++ }
++ return null;
++ });
++ }
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkSaveTask.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkSaveTask.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..0d245ad7d19b11e946e0b5b43bf2181292297210
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkSaveTask.java
+@@ -0,0 +1,111 @@
++package com.destroystokyo.paper.io.chunk;
++
++import co.aikar.timings.Timing;
++import com.destroystokyo.paper.io.PaperFileIOThread;
++import com.destroystokyo.paper.io.IOUtil;
++import com.destroystokyo.paper.io.PrioritizedTaskQueue;
++
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.atomic.AtomicInteger;
++import net.minecraft.nbt.CompoundTag;
++import net.minecraft.server.level.ServerLevel;
++import net.minecraft.world.level.chunk.ChunkAccess;
++import net.minecraft.world.level.chunk.storage.ChunkSerializer;
++
++public final class ChunkSaveTask extends ChunkTask {
++
++ public final ChunkSerializer.AsyncSaveData asyncSaveData;
++ public final ChunkAccess chunk;
++ public final CompletableFuture<CompoundTag> onComplete = new CompletableFuture<>();
++
++ private final AtomicInteger attemptedPriority;
++
++ public ChunkSaveTask(final ServerLevel world, final int chunkX, final int chunkZ, final int priority,
++ final ChunkTaskManager taskManager, final ChunkSerializer.AsyncSaveData asyncSaveData,
++ final ChunkAccess chunk) {
++ super(world, chunkX, chunkZ, priority, taskManager);
++ this.chunk = chunk;
++ this.asyncSaveData = asyncSaveData;
++ this.attemptedPriority = new AtomicInteger(priority);
++ }
++
++ @Override
++ public void run() {
++ // can be executed asynchronously or synchronously
++ final CompoundTag compound;
++
++ try (Timing ignored = this.world.timings.chunkUnloadDataSave.startTimingIfSync()) {
++ compound = ChunkSerializer.saveChunk(this.world, this.chunk, this.asyncSaveData);
++ } catch (final Throwable ex) {
++ // has a plugin modified something it should not have and made us CME?
++ PaperFileIOThread.LOGGER.error("Failed to serialize unloading chunk data for task: " + this.toString() + ", falling back to a synchronous execution", ex);
++
++ // Note: We add to the server thread queue here since this is what the server will drain tasks from
++ // when waiting for chunks
++ ChunkTaskManager.queueChunkWaitTask(() -> {
++ try (Timing ignored = this.world.timings.chunkUnloadDataSave.startTiming()) {
++ CompoundTag data = PaperFileIOThread.FAILURE_VALUE;
++
++ try {
++ data = ChunkSerializer.saveChunk(this.world, this.chunk, this.asyncSaveData);
++ PaperFileIOThread.LOGGER.info("Successfully serialized chunk data for task: " + this.toString() + " synchronously");
++ } catch (final Throwable ex1) {
++ PaperFileIOThread.LOGGER.error("Failed to synchronously serialize unloading chunk data for task: " + this.toString() + "! Chunk data will be lost", ex1);
++ }
++
++ ChunkSaveTask.this.complete(data);
++ }
++ });
++
++ return; // the main thread will now complete the data
++ }
++
++ this.complete(compound);
++ }
++
++ @Override
++ public boolean raisePriority(final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalStateException("Invalid priority: " + priority);
++ }
++
++ // we know priority is valid here
++ for (int curr = this.attemptedPriority.get();;) {
++ if (curr <= priority) {
++ break; // curr is higher/same priority
++ }
++ if (this.attemptedPriority.compareAndSet(curr, priority)) {
++ break;
++ }
++ curr = this.attemptedPriority.get();
++ }
++
++ return super.raisePriority(priority);
++ }
++
++ @Override
++ public boolean updatePriority(final int priority) {
++ if (!PrioritizedTaskQueue.validPriority(priority)) {
++ throw new IllegalStateException("Invalid priority: " + priority);
++ }
++ this.attemptedPriority.set(priority);
++ return super.updatePriority(priority);
++ }
++
++ private void complete(final CompoundTag compound) {
++ try {
++ this.onComplete.complete(compound);
++ } catch (final Throwable thr) {
++ PaperFileIOThread.LOGGER.error("Failed to complete chunk data for task: " + this.toString(), thr);
++ }
++ if (compound != PaperFileIOThread.FAILURE_VALUE) {
++ PaperFileIOThread.Holder.INSTANCE.scheduleSave(this.world, this.chunkX, this.chunkZ, null, compound, this.attemptedPriority.get());
++ }
++ this.taskManager.chunkSaveTasks.compute(Long.valueOf(IOUtil.getCoordinateKey(this.chunkX, this.chunkZ)), (final Long keyInMap, final ChunkSaveTask valueInMap) -> {
++ if (valueInMap != ChunkSaveTask.this) {
++ throw new IllegalStateException("Expected this task to be scheduled, but another was! Other: " + valueInMap + ", this: " + ChunkSaveTask.this);
++ }
++ return null;
++ });
++ }
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTask.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTask.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..058fb5a41565e6ce2acbd1f4d071a1b8be449f5d
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTask.java
+@@ -0,0 +1,40 @@
++package com.destroystokyo.paper.io.chunk;
++
++import com.destroystokyo.paper.io.PaperFileIOThread;
++import com.destroystokyo.paper.io.PrioritizedTaskQueue;
++import net.minecraft.server.level.ServerLevel;
++
++abstract class ChunkTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
++
++ public final ServerLevel world;
++ public final int chunkX;
++ public final int chunkZ;
++ public final ChunkTaskManager taskManager;
++
++ public ChunkTask(final ServerLevel world, final int chunkX, final int chunkZ, final int priority,
++ final ChunkTaskManager taskManager) {
++ super(priority);
++ this.world = world;
++ this.chunkX = chunkX;
++ this.chunkZ = chunkZ;
++ this.taskManager = taskManager;
++ }
++
++ @Override
++ public String toString() {
++ return "Chunk task: class:" + this.getClass().getName() + ", for world '" + this.world.getWorld().getName() +
++ "', (" + this.chunkX + "," + this.chunkZ + "), hashcode:" + this.hashCode() + ", priority: " + this.getPriority();
++ }
++
++ @Override
++ public boolean raisePriority(final int priority) {
++ PaperFileIOThread.Holder.INSTANCE.bumpPriority(this.world, this.chunkX, this.chunkZ, priority);
++ return super.raisePriority(priority);
++ }
++
++ @Override
++ public boolean updatePriority(final int priority) {
++ PaperFileIOThread.Holder.INSTANCE.setPriority(this.world, this.chunkX, this.chunkZ, priority);
++ return super.updatePriority(priority);
++ }
++}
+diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..af40e473521f408aa0e112953c43bdbce164a48b
+--- /dev/null
++++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
+@@ -0,0 +1,535 @@
++package com.destroystokyo.paper.io.chunk;
++
++import com.destroystokyo.paper.io.PaperFileIOThread;
++import com.destroystokyo.paper.io.IOUtil;
++import com.destroystokyo.paper.io.PrioritizedTaskQueue;
++import com.destroystokyo.paper.io.QueueExecutorThread;
++import io.papermc.paper.configuration.GlobalConfiguration;
++import net.minecraft.nbt.CompoundTag;
++import net.minecraft.server.MinecraftServer;
++import net.minecraft.server.level.ChunkHolder;
++import net.minecraft.server.level.ServerChunkCache;
++import net.minecraft.server.level.ServerLevel;
++import net.minecraft.util.thread.BlockableEventLoop;
++import net.minecraft.world.level.chunk.ChunkAccess;
++import net.minecraft.world.level.chunk.ChunkStatus;
++import net.minecraft.world.level.chunk.storage.ChunkSerializer;
++import org.apache.commons.lang.StringUtils;
++import org.apache.logging.log4j.Level;
++import org.bukkit.Bukkit;
++import org.spigotmc.AsyncCatcher;
++
++import java.util.ArrayDeque;
++import java.util.HashSet;
++import java.util.Set;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ConcurrentLinkedQueue;
++import java.util.function.Consumer;
++
++public final class ChunkTaskManager {
++
++ private final QueueExecutorThread<ChunkTask>[] workers;
++ private final ServerLevel world;
++
++ private final PrioritizedTaskQueue<ChunkTask> queue;
++ private final boolean perWorldQueue;
++
++ final ConcurrentHashMap<Long, ChunkLoadTask> chunkLoadTasks = new ConcurrentHashMap<>(64, 0.5f);
++ final ConcurrentHashMap<Long, ChunkSaveTask> chunkSaveTasks = new ConcurrentHashMap<>(64, 0.5f);
++
++ private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config
++
++ protected static QueueExecutorThread<ChunkTask>[] globalWorkers;
++ protected static PrioritizedTaskQueue<ChunkTask> globalQueue;
++
++ protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>();
++
++ public static final ArrayDeque<ChunkInfo> WAITING_CHUNKS = new ArrayDeque<>(); // stack
++
++ private static final class ChunkInfo {
++
++ public final int chunkX;
++ public final int chunkZ;
++ public final ServerLevel world;
++
++ public ChunkInfo(final int chunkX, final int chunkZ, final ServerLevel world) {
++ this.chunkX = chunkX;
++ this.chunkZ = chunkZ;
++ this.world = world;
++ }
++
++ @Override
++ public String toString() {
++ return "[( " + this.chunkX + "," + this.chunkZ + ") in '" + this.world.getWorld().getName() + "']";
++ }
++ }
++
++ public static void pushChunkWait(final ServerLevel world, final int chunkX, final int chunkZ) {
++ synchronized (WAITING_CHUNKS) {
++ WAITING_CHUNKS.push(new ChunkInfo(chunkX, chunkZ, world));
++ }
++ }
++
++ public static void popChunkWait() {
++ synchronized (WAITING_CHUNKS) {
++ WAITING_CHUNKS.pop();
++ }
++ }
++
++ private static ChunkInfo[] getChunkInfos() {
++ ChunkInfo[] chunks;
++ synchronized (WAITING_CHUNKS) {
++ chunks = WAITING_CHUNKS.toArray(new ChunkInfo[0]);
++ }
++ return chunks;
++ }
++
++ public static void dumpAllChunkLoadInfo() {
++ ChunkInfo[] chunks = getChunkInfos();
++ if (chunks.length > 0) {
++ PaperFileIOThread.LOGGER.error("Chunk wait task info below: ");
++
++ for (final ChunkInfo chunkInfo : chunks) {
++ final long key = IOUtil.getCoordinateKey(chunkInfo.chunkX, chunkInfo.chunkZ);
++ final ChunkLoadTask loadTask = chunkInfo.world.asyncChunkTaskManager.chunkLoadTasks.get(key);
++ final ChunkSaveTask saveTask = chunkInfo.world.asyncChunkTaskManager.chunkSaveTasks.get(key);
++
++ PaperFileIOThread.LOGGER.error(chunkInfo.chunkX + "," + chunkInfo.chunkZ + " in '" + chunkInfo.world.getWorld().getName() + ":");
++ PaperFileIOThread.LOGGER.error("Load Task - " + (loadTask == null ? "none" : loadTask.toString()));
++ PaperFileIOThread.LOGGER.error("Save Task - " + (saveTask == null ? "none" : saveTask.toString()));
++ // log current status of chunk to indicate whether we're waiting on generation or loading
++ ChunkHolder chunkHolder = chunkInfo.world.getChunkSource().chunkMap.getVisibleChunkIfPresent(key);
++
++ dumpChunkInfo(new HashSet<>(), chunkHolder, chunkInfo.chunkX, chunkInfo.chunkZ);
++ }
++ }
++ }
++
++ static void dumpChunkInfo(Set<ChunkHolder> seenChunks, ChunkHolder chunkHolder, int x, int z) {
++ dumpChunkInfo(seenChunks, chunkHolder, x, z, 0, 1);
++ }
++
++ static void dumpChunkInfo(Set<ChunkHolder> seenChunks, ChunkHolder chunkHolder, int x, int z, int indent, int maxDepth) {
++ if (seenChunks.contains(chunkHolder)) {
++ return;
++ }
++ if (indent > maxDepth) {
++ return;
++ }
++ seenChunks.add(chunkHolder);
++ String indentStr = StringUtils.repeat(" ", indent);
++ if (chunkHolder == null) {
++ PaperFileIOThread.LOGGER.error(indentStr + "Chunk Holder - null for (" + x +"," + z +")");
++ } else {
++ ChunkAccess chunk = chunkHolder.getLastAvailable();
++ ChunkStatus holderStatus = chunkHolder.getChunkHolderStatus();
++ PaperFileIOThread.LOGGER.error(indentStr + "Chunk Holder - non-null");
++ PaperFileIOThread.LOGGER.error(indentStr + "Chunk Status - " + ((chunk == null) ? "null chunk" : chunk.getStatus().toString()));
++ PaperFileIOThread.LOGGER.error(indentStr + "Chunk Ticket Status - " + ChunkHolder.getStatus(chunkHolder.getTicketLevel()));
++ PaperFileIOThread.LOGGER.error(indentStr + "Chunk Holder Status - " + ((holderStatus == null) ? "null" : holderStatus.toString()));
++ }
++ }
++
++ public static void processConfiguration(GlobalConfiguration.AsyncChunks config) {
++ int threads = config.threads; // don't write back to config
++ int cpus = Runtime.getRuntime().availableProcessors() / 2;
++ if (threads <= 0) {
++ if (cpus <= 4) {
++ threads = cpus <= 2 ? 1 : 2;
++ } else {
++ threads = (int) Math.min(Integer.getInteger("paper.maxChunkThreads", 4), cpus / 2);
++ }
++ }
++ if (cpus == 1 && !Boolean.getBoolean("Paper.allowAsyncChunksSingleCore")) {
++ config.asyncChunks = false;
++ } else {
++ config.asyncChunks = true;
++ }
++
++ // Let Shared Host set some limits
++ String sharedHostThreads = System.getenv("PAPER_ASYNC_CHUNKS_SHARED_HOST_THREADS");
++ if (sharedHostThreads != null) {
++ try {
++ threads = Math.max(1, Math.min(threads, Integer.parseInt(sharedHostThreads)));
++ } catch (NumberFormatException ignored) {}
++ }
++
++ if (config.asyncChunks) {
++ ChunkTaskManager.initGlobalLoadThreads(threads);
++ }
++ }
++
++ public static void initGlobalLoadThreads(int threads) {
++ if (threads <= 0 || globalWorkers != null) {
++ return;
++ }
++ ++threads; // add one for urgent executor
++
++ globalWorkers = new QueueExecutorThread[threads];
++ globalQueue = new PrioritizedTaskQueue<>();
++
++ for (int i = 0; i < (threads - 1); ++i) {
++ globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
++ globalWorkers[i].setName("Paper Async Chunk Task Thread #" + i);
++ globalWorkers[i].setPriority(Thread.NORM_PRIORITY - 1);
++ globalWorkers[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
++ PaperFileIOThread.LOGGER.error("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
++ });
++
++ globalWorkers[i].start();
++ }
++
++ globalWorkers[threads - 1] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
++ globalWorkers[threads - 1].setName("Paper Async Chunk Urgent Task Thread");
++ globalWorkers[threads - 1].setPriority(Thread.NORM_PRIORITY+1);
++ globalWorkers[threads - 1].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
++ PaperFileIOThread.LOGGER.error("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
++ });
++ globalWorkers[threads - 1].setLowestPriorityToPoll(PrioritizedTaskQueue.HIGHEST_PRIORITY);
++ globalWorkers[threads - 1].start();
++ }
++
++ /**
++ * Creates this chunk task manager to operate off the specified number of threads. If the specified number of threads is
++ * less-than or equal to 0, then this chunk task manager will operate off of the world's chunk task queue.
++ * @param world Specified world.
++ * @param threads Specified number of threads.
++ * @see ServerChunkCache#mainThreadProcessor
++ */
++ public ChunkTaskManager(final ServerLevel world, final int threads) {
++ this.world = world;
++ this.workers = threads <= 0 ? null : new QueueExecutorThread[threads];
++ this.queue = new PrioritizedTaskQueue<>();
++ this.perWorldQueue = true;
++
++ for (int i = 0; i < threads; ++i) {
++ this.workers[i] = new QueueExecutorThread<>(this.queue, (long)0.10e6); //0.1ms
++ this.workers[i].setName("Async chunk loader thread #" + i + " for world: " + world.getWorld().getName());
++ this.workers[i].setPriority(Thread.NORM_PRIORITY - 1);
++ this.workers[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
++ PaperFileIOThread.LOGGER.error("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
++ });
++
++ this.workers[i].start();
++ }
++ }
++
++ /**
++ * Creates the chunk task manager to work from the global workers. When {@link #close(boolean)} is invoked,
++ * the global queue is not shutdown. If the global workers is configured to be disabled or use 0 threads, then
++ * this chunk task manager will operate off of the world's chunk task queue.
++ * @param world The world that this task manager is responsible for
++ * @see ServerChunkCache#mainThreadProcessor
++ */
++ public ChunkTaskManager(final ServerLevel world) {
++ this.world = world;
++ this.workers = globalWorkers;
++ this.queue = globalQueue;
++ this.perWorldQueue = false;
++ }
++
++ public boolean pollNextChunkTask() {
++ final ChunkTask task = this.chunkTasks.poll();
++
++ if (task != null) {
++ task.run();
++ return true;
++ }
++ return false;
++ }
++
++ /**
++ * Polls and runs the next available chunk wait queue task. This is to be used when the server is waiting on a chunk queue.
++ * (per-world can cause issues if all the worker threads are blocked waiting for a response from the main thread)
++ */
++ public static boolean pollChunkWaitQueue() {
++ final Runnable run = CHUNK_WAIT_QUEUE.poll();
++ if (run != null) {
++ run.run();
++ return true;
++ }
++ return false;
++ }
++
++ /**
++ * Queues a chunk wait task. Note that this will execute out of order with respect to tasks scheduled on a world's
++ * chunk task queue, since this is the global chunk wait queue.
++ */
++ public static void queueChunkWaitTask(final Runnable runnable) {
++ CHUNK_WAIT_QUEUE.add(runnable);
++ }
++
++ private static void drainChunkWaitQueue() {
++ Runnable run;
++ while ((run = CHUNK_WAIT_QUEUE.poll()) != null) {
++ run.run();
++ }
++ }
++
++ /**
++ * The exact same as {@link #scheduleChunkLoad(int, int, int, Consumer, boolean)}, except that the chunk data is provided as
++ * the {@code data} parameter.
++ */
++ public ChunkLoadTask scheduleChunkLoad(final int chunkX, final int chunkZ, final int priority,
++ final Consumer<ChunkSerializer.InProgressChunkHolder> onComplete,
++ final boolean intendingToBlock, final CompletableFuture<CompoundTag> dataFuture) {
++ final ServerLevel world = this.world;
++
++ return this.chunkLoadTasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
++ if (valueInMap != null) {
++ if (!valueInMap.cancelled) {
++ throw new IllegalStateException("Double scheduling chunk load for task: " + valueInMap.toString());
++ }
++ valueInMap.cancelled = false;
++ valueInMap.onComplete = onComplete;
++ return valueInMap;
++ }
++
++ final ChunkLoadTask ret = new ChunkLoadTask(world, chunkX, chunkZ, priority, ChunkTaskManager.this, onComplete);
++
++ dataFuture.thenAccept((final CompoundTag data) -> {
++ final boolean failed = data == PaperFileIOThread.FAILURE_VALUE;
++ PaperFileIOThread.Holder.INSTANCE.loadChunkDataAsync(world, chunkX, chunkZ, priority, (final PaperFileIOThread.ChunkData chunkData) -> {
++ ret.chunkData = chunkData;
++ if (!failed) {
++ chunkData.chunkData = data;
++ }
++ ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
++ }, true, failed, intendingToBlock); // read data off disk if the future fails
++ });
++
++ return ret;
++ });
++ }
++
++ public void cancelChunkLoad(final int chunkX, final int chunkZ) {
++ this.chunkLoadTasks.compute(IOUtil.getCoordinateKey(chunkX, chunkZ), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
++ if (valueInMap == null) {
++ return null;
++ }
++
++ if (valueInMap.cancelled) {
++ PaperFileIOThread.LOGGER.warn("Task " + valueInMap.toString() + " is already cancelled!");
++ }
++ valueInMap.cancelled = true;
++ if (valueInMap.cancel()) {
++ return null;
++ }
++
++ return valueInMap;
++ });
++ }
++
++ /**
++ * Schedules an asynchronous chunk load for the specified coordinates. The onComplete parameter may be invoked asynchronously
++ * on a worker thread or on the world's chunk executor queue. As such the code that is executed for the parameter should be
++ * carefully chosen.
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @param priority Priority for this task
++ * @param onComplete The consumer to invoke with the {@link ChunkSerializer.InProgressChunkHolder} object once this task is complete
++ * @param intendingToBlock Whether the caller is intending to block on this task completing (this is a performance tune, and has no adverse side-effects)
++ * @return The {@link ChunkLoadTask} associated with
++ */
++ public ChunkLoadTask scheduleChunkLoad(final int chunkX, final int chunkZ, final int priority,
++ final Consumer<ChunkSerializer.InProgressChunkHolder> onComplete,
++ final boolean intendingToBlock) {
++ final ServerLevel world = this.world;
++
++ return this.chunkLoadTasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
++ if (valueInMap != null) {
++ if (!valueInMap.cancelled) {
++ throw new IllegalStateException("Double scheduling chunk load for task: " + valueInMap.toString());
++ }
++ valueInMap.cancelled = false;
++ valueInMap.onComplete = onComplete;
++ return valueInMap;
++ }
++
++ final ChunkLoadTask ret = new ChunkLoadTask(world, chunkX, chunkZ, priority, ChunkTaskManager.this, onComplete);
++
++ PaperFileIOThread.Holder.INSTANCE.loadChunkDataAsync(world, chunkX, chunkZ, priority, (final PaperFileIOThread.ChunkData chunkData) -> {
++ ret.chunkData = chunkData;
++ ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
++ }, true, true, intendingToBlock);
++
++ return ret;
++ });
++ }
++
++ /**
++ * Schedules an async save for the specified chunk. The chunk, at the beginning of this call, must be completely unloaded
++ * from the world.
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @param priority Priority for this task
++ * @param asyncSaveData Async save data. See {@link ChunkSerializer#getAsyncSaveData(ServerLevel, ChunkAccess)}
++ * @param chunk Chunk to save
++ * @return The {@link ChunkSaveTask} associated with the save task.
++ */
++ public ChunkSaveTask scheduleChunkSave(final int chunkX, final int chunkZ, final int priority,
++ final ChunkSerializer.AsyncSaveData asyncSaveData,
++ final ChunkAccess chunk) {
++ AsyncCatcher.catchOp("chunk save schedule");
++
++ final ServerLevel world = this.world;
++
++ return this.chunkSaveTasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkSaveTask valueInMap) -> {
++ if (valueInMap != null) {
++ throw new IllegalStateException("Double scheduling chunk save for task: " + valueInMap.toString());
++ }
++
++ final ChunkSaveTask ret = new ChunkSaveTask(world, chunkX, chunkZ, priority, ChunkTaskManager.this, asyncSaveData, chunk);
++
++ ChunkTaskManager.this.internalSchedule(ret);
++
++ return ret;
++ });
++ }
++
++ /**
++ * Returns a completable future which will be completed with the <b>un-copied</b> chunk data for an in progress async save.
++ * Returns {@code null} if no save is in progress.
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ */
++ public CompletableFuture<CompoundTag> getChunkSaveFuture(final int chunkX, final int chunkZ) {
++ final ChunkSaveTask chunkSaveTask = this.chunkSaveTasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)));
++ if (chunkSaveTask == null) {
++ return null;
++ }
++ return chunkSaveTask.onComplete;
++ }
++
++ /**
++ * Returns the chunk object being used to serialize data async for an unloaded chunk. Note that modifying this chunk
++ * is not safe to do as another thread is handling its save. The chunk is also not loaded into the world.
++ * @param chunkX Chunk's x coordinate
++ * @param chunkZ Chunk's z coordinate
++ * @return Chunk object for an in-progress async save, or {@code null} if no save is in progress
++ */
++ public ChunkAccess getChunkInSaveProgress(final int chunkX, final int chunkZ) {
++ final ChunkSaveTask chunkSaveTask = this.chunkSaveTasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)));
++ if (chunkSaveTask == null) {
++ return null;
++ }
++ return chunkSaveTask.chunk;
++ }
++
++ public void flush() {
++ // flush here since we schedule tasks on the IO thread that can schedule tasks here
++ drainChunkWaitQueue();
++ PaperFileIOThread.Holder.INSTANCE.flush();
++ drainChunkWaitQueue();
++
++ if (this.workers == null) {
++ if (Bukkit.isPrimaryThread() || MinecraftServer.getServer().hasStopped()) {
++ ((BlockableEventLoop<Runnable>)this.world.getChunkSource().mainThreadProcessor).runAllTasks();
++ } else {
++ CompletableFuture<Void> wait = new CompletableFuture<>();
++ MinecraftServer.getServer().scheduleOnMain(() -> {
++ ((BlockableEventLoop<Runnable>)this.world.getChunkSource().mainThreadProcessor).runAllTasks();
++ });
++ wait.join();
++ }
++ } else {
++ for (final QueueExecutorThread<ChunkTask> worker : this.workers) {
++ worker.flush();
++ }
++ }
++
++ // flush again since tasks we execute async saves
++ drainChunkWaitQueue();
++ PaperFileIOThread.Holder.INSTANCE.flush();
++ }
++
++ public void close(final boolean wait) {
++ // flush here since we schedule tasks on the IO thread that can schedule tasks to this task manager
++ // we do this regardless of the wait param since after we invoke close no tasks can be queued
++ PaperFileIOThread.Holder.INSTANCE.flush();
++
++ if (this.workers == null) {
++ if (wait) {
++ this.flush();
++ }
++ return;
++ }
++
++ if (this.workers != globalWorkers) {
++ for (final QueueExecutorThread<ChunkTask> worker : this.workers) {
++ worker.close(false, this.perWorldQueue);
++ }
++ }
++
++ if (wait) {
++ this.flush();
++ }
++ }
++
++ public void raisePriority(final int chunkX, final int chunkZ, final int priority) {
++ final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
++
++ ChunkTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey);
++ if (chunkSaveTask != null) {
++ // don't bump save into urgent queue
++ raiseTaskPriority(chunkSaveTask, priority != PrioritizedTaskQueue.HIGHEST_PRIORITY ? priority : PrioritizedTaskQueue.HIGH_PRIORITY);
++ }
++
++ ChunkLoadTask chunkLoadTask = this.chunkLoadTasks.get(chunkKey);
++ if (chunkLoadTask != null) {
++ raiseTaskPriority(chunkLoadTask, priority);
++ }
++ }
++
++ private void raiseTaskPriority(ChunkTask task, int priority) {
++ final boolean raised = task.raisePriority(priority);
++ if (task.isScheduled() && raised && this.workers != null) {
++ // only notify if we're in queue to be executed
++ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
++ // notify urgent worker as well
++ this.internalScheduleNotifyUrgent();
++ }
++ this.internalScheduleNotify();
++ }
++ }
++
++ protected void internalSchedule(final ChunkTask task) {
++ if (this.workers == null) {
++ this.chunkTasks.add(task);
++ return;
++ }
++
++ // It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread
++ // wakes up and goes to sleep before we actually schedule (or it's just about to sleep)
++ this.queue.add(task);
++ this.internalScheduleNotify();
++ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
++ // notify urgent too
++ this.internalScheduleNotifyUrgent();
++ }
++
++ }
++
++ protected void internalScheduleNotify() {
++ if (this.workers == null) {
++ return;
++ }
++ for (int i = 0, len = this.workers.length - 1; i < len; ++i) {
++ final QueueExecutorThread<ChunkTask> worker = this.workers[i];
++ if (worker.notifyTasks()) {
++ // break here since we only want to wake up one worker for scheduling one task
++ break;
++ }
++ }
++ }
++
++
++ protected void internalScheduleNotifyUrgent() {
++ if (this.workers == null) {
++ return;
++ }
++ this.workers[this.workers.length - 1].notifyTasks();
++ }
++
++}
+diff --git a/src/main/java/net/minecraft/network/protocol/game/ServerboundCommandSuggestionPacket.java b/src/main/java/net/minecraft/network/protocol/game/ServerboundCommandSuggestionPacket.java
+index a5e438a834826161c52ca9db57d234d9ff80a591..b8bc1b9b8e8a33df90a963f9f9769292bf595642 100644
+--- a/src/main/java/net/minecraft/network/protocol/game/ServerboundCommandSuggestionPacket.java
++++ b/src/main/java/net/minecraft/network/protocol/game/ServerboundCommandSuggestionPacket.java
+@@ -14,7 +14,7 @@ public class ServerboundCommandSuggestionPacket implements Packet<ServerGamePack
+
+ public ServerboundCommandSuggestionPacket(FriendlyByteBuf buf) {
+ this.id = buf.readVarInt();
+- this.command = buf.readUtf(32500);
++ this.command = buf.readUtf(2048);
+ }
+
+ @Override
+diff --git a/src/main/java/net/minecraft/server/Main.java b/src/main/java/net/minecraft/server/Main.java
+index a48a12a31a3d09a9373b688dcc093035f8f8a300..0c59ca1a22449893adcfa851198f057ce69bb7e3 100644
+--- a/src/main/java/net/minecraft/server/Main.java
++++ b/src/main/java/net/minecraft/server/Main.java
+@@ -245,6 +245,7 @@ public class Main {
+
+ convertable_conversionsession.saveDataTag(iregistrycustom_dimension, savedata);
+ */
++ Class.forName(net.minecraft.world.entity.npc.VillagerTrades.class.getName());// Paper - load this sync so it won't fail later async
+ final DedicatedServer dedicatedserver = (DedicatedServer) MinecraftServer.spin((thread) -> {
+ DedicatedServer dedicatedserver1 = new DedicatedServer(optionset, config.get(), ops.get(), thread, convertable_conversionsession, resourcepackrepository, worldstem, dedicatedserversettings, DataFixers.getDataFixer(), services, LoggerChunkProgressListener::new);
+
+diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
+index 53be6189d3fa6a65a09996683913fbbf5133dcb7..d53fb6bba90936c1182b0687d014964cef81694f 100644
+--- a/src/main/java/net/minecraft/server/MinecraftServer.java
++++ b/src/main/java/net/minecraft/server/MinecraftServer.java
+@@ -932,7 +932,7 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
+ this.getProfileCache().save();
+ }
+ // Spigot end
+-
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.close(true, true); // Paper
+ }
+
+ public String getLocalIp() {
+diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java
+index 4da0cbe58dad0f66e0d056c71684120514dcac6a..c3bbaf32373a32417f8b83f386f8cf327c6e0893 100644
+--- a/src/main/java/net/minecraft/server/level/ChunkMap.java
++++ b/src/main/java/net/minecraft/server/level/ChunkMap.java
+@@ -545,6 +545,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ public void close() throws IOException {
+ try {
+ this.queueSorter.close();
++ this.level.asyncChunkTaskManager.close(true); // Paper - Required since we're closing regionfiles in the next line
+ this.poiManager.close();
+ } finally {
+ super.close();
+@@ -553,6 +554,16 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ }
+
+ protected void saveAllChunks(boolean flush) {
++ // Paper start - do not overload I/O threads with too much work when saving
++ int[] saved = new int[1];
++ int maxAsyncSaves = 50;
++ Runnable onChunkSave = () -> {
++ if (++saved[0] >= maxAsyncSaves) {
++ saved[0] = 0;
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.flush();
++ }
++ };
++ // Paper end - do not overload I/O threads with too much work when saving
+ if (flush) {
+ List<ChunkHolder> list = (List) net.minecraft.server.ChunkSystem.getVisibleChunkHolders(this.level).stream().filter(ChunkHolder::wasAccessibleSinceLastSave).peek(ChunkHolder::refreshAccessibility).collect(Collectors.toList()); // Paper
+ MutableBoolean mutableboolean = new MutableBoolean();
+@@ -574,6 +585,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ }).filter((ichunkaccess) -> {
+ return ichunkaccess instanceof ImposterProtoChunk || ichunkaccess instanceof LevelChunk;
+ }).filter(this::save).forEach((ichunkaccess) -> {
++ onChunkSave.run(); // Paper - do not overload I/O threads with too much work when saving
+ mutableboolean.setTrue();
+ });
+ } while (mutableboolean.isTrue());
+@@ -581,7 +593,8 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ this.processUnloads(() -> {
+ return true;
+ });
+- this.flushWorker();
++ //this.flushWorker(); // Paper - nuke IOWorker
++ this.level.asyncChunkTaskManager.flush(); // Paper - flush to preserve behavior compat with pre-async behaviour
+ } else {
+ net.minecraft.server.ChunkSystem.getVisibleChunkHolders(this.level).forEach(this::saveChunkIfNeeded);
+ }
+@@ -591,11 +604,15 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ protected void tick(BooleanSupplier shouldKeepTicking) {
+ ProfilerFiller gameprofilerfiller = this.level.getProfiler();
+
++ try (Timing ignored = this.level.timings.poiUnload.startTiming()) { // Paper
+ gameprofilerfiller.push("poi");
+ this.poiManager.tick(shouldKeepTicking);
++ } // Paper
+ gameprofilerfiller.popPush("chunk_unload");
+ if (!this.level.noSave()) {
++ try (Timing ignored = this.level.timings.chunkUnload.startTiming()) { // Paper
+ this.processUnloads(shouldKeepTicking);
++ } // Paper
+ }
+
+ gameprofilerfiller.pop();
+@@ -658,7 +675,16 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ ((LevelChunk) ichunkaccess).setLoaded(false);
+ }
+
+- this.save(ichunkaccess);
++ // Paper start - async chunk saving
++ try {
++ this.asyncSave(ichunkaccess);
++ } catch (ThreadDeath ex) {
++ throw ex; // bye
++ } catch (Throwable ex) {
++ LOGGER.error("Failed to prepare async save, attempting synchronous save", ex);
++ this.save(ichunkaccess);
++ }
++ // Paper end - async chunk saving
+ if (this.entitiesInLevel.remove(pos) && ichunkaccess instanceof LevelChunk) {
+ LevelChunk chunk = (LevelChunk) ichunkaccess;
+
+@@ -727,32 +753,54 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ }
+
+ private CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> scheduleChunkLoad(ChunkPos pos) {
+- return this.readChunk(pos).thenApply((optional) -> {
+- return optional.filter((nbttagcompound) -> {
+- boolean flag = ChunkMap.isChunkDataValid(nbttagcompound);
++ // Paper start - Async chunk io
++ final java.util.function.BiFunction<ChunkSerializer.InProgressChunkHolder, Throwable, Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> syncLoadComplete = (chunkHolder, ioThrowable) -> {
++ try (Timing ignored = this.level.timings.chunkLoad.startTimingIfSync()) { // Paper
++ this.level.getProfiler().incrementCounter("chunkLoad");
++ if (ioThrowable != null) {
++ return this.handleChunkLoadFailure(ioThrowable, pos);
++ }
++ this.poiManager.loadInData(pos, chunkHolder.poiData);
++ chunkHolder.tasks.forEach(Runnable::run);
+
+- if (!flag) {
+- ChunkMap.LOGGER.error("Chunk file at {} is missing level data, skipping", pos);
++ if (chunkHolder.protoChunk != null) {
++ ProtoChunk protochunk = chunkHolder.protoChunk;
++ this.markPosition(pos, protochunk.getStatus().getChunkType());
++ return Either.left(protochunk);
+ }
++ } catch (Exception ex) {
++ return this.handleChunkLoadFailure(ex, pos);
++ }
+
+- return flag;
++ return Either.left(this.createEmptyChunk(pos));
++ };
++ CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> ret = new CompletableFuture<>();
++
++ Consumer<ChunkSerializer.InProgressChunkHolder> chunkHolderConsumer = (ChunkSerializer.InProgressChunkHolder holder) -> {
++ // Go into the chunk load queue and not server task queue so we can be popped out even faster.
++ com.destroystokyo.paper.io.chunk.ChunkTaskManager.queueChunkWaitTask(() -> {
++ try {
++ ret.complete(syncLoadComplete.apply(holder, null));
++ } catch (Exception e) {
++ ret.completeExceptionally(e);
++ }
+ });
+- }).thenApplyAsync((optional) -> {
+- this.level.getProfiler().incrementCounter("chunkLoad");
+- if (optional.isPresent()) {
+- ProtoChunk protochunk = ChunkSerializer.read(this.level, this.poiManager, pos, (CompoundTag) optional.get());
++ };
+
+- this.markPosition(pos, protochunk.getStatus().getChunkType());
+- return Either.<ChunkAccess, ChunkHolder.ChunkLoadingFailure>left(protochunk); // CraftBukkit - decompile error
+- } else {
+- return Either.<ChunkAccess, ChunkHolder.ChunkLoadingFailure>left(this.createEmptyChunk(pos)); // CraftBukkit - decompile error
+- }
+- }, this.mainThreadExecutor).exceptionallyAsync((throwable) -> {
+- return this.handleChunkLoadFailure(throwable, pos);
+- }, this.mainThreadExecutor);
++ CompletableFuture<CompoundTag> chunkSaveFuture = this.level.asyncChunkTaskManager.getChunkSaveFuture(pos.x, pos.z);
++ if (chunkSaveFuture != null) {
++ this.level.asyncChunkTaskManager.scheduleChunkLoad(pos.x, pos.z,
++ com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY, chunkHolderConsumer, false, chunkSaveFuture);
++ this.level.asyncChunkTaskManager.raisePriority(pos.x, pos.z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY);
++ } else {
++ this.level.asyncChunkTaskManager.scheduleChunkLoad(pos.x, pos.z,
++ com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, chunkHolderConsumer, false);
++ }
++ return ret;
++ // Paper end - Async chunk io
+ }
+
+- private static boolean isChunkDataValid(CompoundTag nbt) {
++ public static boolean isChunkDataValid(CompoundTag nbt) { // Paper - async chunk loading
+ return nbt.contains("Status", 8);
+ }
+
+@@ -991,7 +1039,48 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ }
+ }
+
++ // Paper start - async chunk save for unload
++ // Note: This is very unsafe to call if the chunk is still in use.
++ // This is also modeled after PlayerChunkMap#save(IChunkAccess, boolean), with the intentional difference being
++ // serializing the chunk is left to a worker thread.
++ private void asyncSave(ChunkAccess chunk) {
++ ChunkPos chunkPos = chunk.getPos();
++ CompoundTag poiData;
++ try (Timing ignored = this.level.timings.chunkUnloadPOISerialization.startTiming()) {
++ poiData = this.poiManager.getData(chunk.getPos());
++ }
++
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(this.level, chunkPos.x, chunkPos.z,
++ poiData, null, com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY);
++
++ if (!chunk.isUnsaved()) {
++ return;
++ }
++
++ ChunkStatus chunkstatus = chunk.getStatus();
++
++ // Copied from PlayerChunkMap#save(IChunkAccess, boolean)
++ if (chunkstatus.getChunkType() != ChunkStatus.ChunkType.LEVELCHUNK) {
++ // Paper start - Optimize save by using status cache
++ if (chunkstatus == ChunkStatus.EMPTY && chunk.getAllStarts().values().stream().noneMatch(StructureStart::isValid)) {
++ return;
++ }
++ }
++
++ ChunkSerializer.AsyncSaveData asyncSaveData;
++ try (Timing ignored = this.level.timings.chunkUnloadPrepareSave.startTiming()) {
++ asyncSaveData = ChunkSerializer.getAsyncSaveData(this.level, chunk);
++ }
++
++ this.level.asyncChunkTaskManager.scheduleChunkSave(chunkPos.x, chunkPos.z, com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY,
++ asyncSaveData, chunk);
++
++ chunk.setUnsaved(false);
++ }
++ // Paper end
++
+ public boolean save(ChunkAccess chunk) {
++ try (co.aikar.timings.Timing ignored = this.level.timings.chunkSave.startTiming()) { // Paper
+ this.poiManager.flush(chunk.getPos());
+ if (!chunk.isUnsaved()) {
+ return false;
+@@ -1003,7 +1092,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ ChunkStatus chunkstatus = chunk.getStatus();
+
+ if (chunkstatus.getChunkType() != ChunkStatus.ChunkType.LEVELCHUNK) {
+- if (this.isExistingChunkFull(chunkcoordintpair)) {
++ if (false && this.isExistingChunkFull(chunkcoordintpair)) { // Paper
+ return false;
+ }
+
+@@ -1013,9 +1102,15 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ }
+
+ this.level.getProfiler().incrementCounter("chunkSave");
+- CompoundTag nbttagcompound = ChunkSerializer.write(this.level, chunk);
++ CompoundTag nbttagcompound;
++ try (co.aikar.timings.Timing ignored1 = this.level.timings.chunkSaveDataSerialization.startTiming()) { // Paper
++ nbttagcompound = ChunkSerializer.write(this.level, chunk);
++ } // Paper
+
+- this.write(chunkcoordintpair, nbttagcompound);
++ // Paper start - async chunk io
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(this.level, chunkcoordintpair.x, chunkcoordintpair.z,
++ null, nbttagcompound, com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY);
++ // Paper end - async chunk io
+ this.markPosition(chunkcoordintpair, chunkstatus.getChunkType());
+ return true;
+ } catch (Exception exception) {
+@@ -1023,6 +1118,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ return false;
+ }
+ }
++ } // Paper
+ }
+
+ private boolean isExistingChunkFull(ChunkPos pos) {
+@@ -1156,6 +1252,35 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
+ }
+ }
+
++ // Paper start - Asynchronous chunk io
++ @Nullable
++ @Override
++ public CompoundTag readSync(ChunkPos chunkcoordintpair) throws IOException {
++ if (Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
++ CompoundTag ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE
++ .loadChunkDataAsyncFuture(this.level, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
++ false, true, true).join().chunkData;
++
++ if (ret == com.destroystokyo.paper.io.PaperFileIOThread.FAILURE_VALUE) {
++ throw new IOException("See logs for further detail");
++ }
++ return ret;
++ }
++ return super.readSync(chunkcoordintpair);
++ }
++
++ @Override
++ public void write(ChunkPos chunkcoordintpair, CompoundTag nbttagcompound) throws IOException {
++ if (Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(
++ this.level, chunkcoordintpair.x, chunkcoordintpair.z, null, nbttagcompound,
++ com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread());
++ return;
++ }
++ super.write(chunkcoordintpair, nbttagcompound);
++ }
++ // Paper end
++
+ private CompletableFuture<Optional<CompoundTag>> readChunk(ChunkPos chunkPos) {
+ return this.read(chunkPos).thenApplyAsync((optional) -> {
+ return optional.map((nbttagcompound) -> this.upgradeChunkTag(nbttagcompound, chunkPos)); // CraftBukkit
+diff --git a/src/main/java/net/minecraft/server/level/DistanceManager.java b/src/main/java/net/minecraft/server/level/DistanceManager.java
+index aaf6344d3187ceada947ce6ee0fbba91ca0271a3..1d6ab658c48bb765f66624f276ec7b05cf33c1d5 100644
+--- a/src/main/java/net/minecraft/server/level/DistanceManager.java
++++ b/src/main/java/net/minecraft/server/level/DistanceManager.java
+@@ -402,7 +402,7 @@ public abstract class DistanceManager {
+ }
+
+ public void removeTicketsOnClosing() {
+- ImmutableSet<TicketType<?>> immutableset = ImmutableSet.of(TicketType.UNKNOWN, TicketType.POST_TELEPORT, TicketType.LIGHT, TicketType.FUTURE_AWAIT); // Paper - add additional tickets to preserve
++ ImmutableSet<TicketType<?>> immutableset = ImmutableSet.of(TicketType.UNKNOWN, TicketType.POST_TELEPORT, TicketType.LIGHT, TicketType.FUTURE_AWAIT, TicketType.ASYNC_LOAD); // Paper - add additional tickets to preserve
+ ObjectIterator objectiterator = this.tickets.long2ObjectEntrySet().fastIterator();
+
+ while (objectiterator.hasNext()) {
+diff --git a/src/main/java/net/minecraft/server/level/ServerChunkCache.java b/src/main/java/net/minecraft/server/level/ServerChunkCache.java
+index 514111ed7fe8aec0d5f15f7a8f157d5872a56e4f..3ff5e35e45a71dc03552dedb65c7338317e9d0a9 100644
+--- a/src/main/java/net/minecraft/server/level/ServerChunkCache.java
++++ b/src/main/java/net/minecraft/server/level/ServerChunkCache.java
+@@ -308,10 +308,111 @@ public class ServerChunkCache extends ChunkSource {
+ return ret;
+ }
+ // Paper end
++ // Paper start - async chunk io
++ private long asyncLoadSeqCounter;
++
++ public CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> getChunkAtAsynchronously(int x, int z, boolean gen, boolean isUrgent) {
++ if (Thread.currentThread() != this.mainThread) {
++ CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> future = new CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>>();
++ this.mainThreadProcessor.execute(() -> {
++ this.getChunkAtAsynchronously(x, z, gen, isUrgent).whenComplete((chunk, ex) -> {
++ if (ex != null) {
++ future.completeExceptionally(ex);
++ } else {
++ future.complete(chunk);
++ }
++ });
++ });
++ return future;
++ }
++
++ long k = ChunkPos.asLong(x, z);
++ ChunkPos chunkPos = new ChunkPos(x, z);
++
++ ChunkAccess ichunkaccess;
++
++ // try cache
++ for (int l = 0; l < 4; ++l) {
++ if (k == this.lastChunkPos[l] && ChunkStatus.FULL == this.lastChunkStatus[l]) {
++ ichunkaccess = this.lastChunk[l];
++ if (ichunkaccess != null) { // CraftBukkit - the chunk can become accessible in the meantime TODO for non-null chunks it might also make sense to check that the chunk's state hasn't changed in the meantime
++
++ // move to first in cache
++
++ for (int i1 = 3; i1 > 0; --i1) {
++ this.lastChunkPos[i1] = this.lastChunkPos[i1 - 1];
++ this.lastChunkStatus[i1] = this.lastChunkStatus[i1 - 1];
++ this.lastChunk[i1] = this.lastChunk[i1 - 1];
++ }
++
++ this.lastChunkPos[0] = k;
++ this.lastChunkStatus[0] = ChunkStatus.FULL;
++ this.lastChunk[0] = ichunkaccess;
++
++ return CompletableFuture.completedFuture(Either.left(ichunkaccess));
++ }
++ }
++ }
++
++ if (gen) {
++ return this.bringToFullStatusAsync(x, z, chunkPos, isUrgent);
++ }
++
++ ChunkAccess current = this.getChunkAtImmediately(x, z); // we want to bypass ticket restrictions
++ if (current != null) {
++ if (!(current instanceof net.minecraft.world.level.chunk.ImposterProtoChunk) && !(current instanceof LevelChunk)) {
++ return CompletableFuture.completedFuture(ChunkHolder.UNLOADED_CHUNK);
++ }
++ // we know the chunk is at full status here (either in read-only mode or the real thing)
++ return this.bringToFullStatusAsync(x, z, chunkPos, isUrgent);
++ }
++
++ // here we don't know what status it is and we're not supposed to generate
++ // so we asynchronously load empty status
++ return this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.EMPTY, isUrgent).thenCompose((either) -> {
++ ChunkAccess chunk = either.left().orElse(null);
++ if (!(chunk instanceof net.minecraft.world.level.chunk.ImposterProtoChunk) && !(chunk instanceof LevelChunk)) {
++ // the chunk on disk was not a full status chunk
++ return CompletableFuture.completedFuture(ChunkHolder.UNLOADED_CHUNK);
++ }
++ // bring to full status if required
++ return this.bringToFullStatusAsync(x, z, chunkPos, isUrgent);
++ });
++ }
++
++ private CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> bringToFullStatusAsync(int x, int z, ChunkPos chunkPos, boolean isUrgent) {
++ return this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.FULL, isUrgent);
++ }
++
++ private CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> bringToStatusAsync(int x, int z, ChunkPos chunkPos, ChunkStatus status, boolean isUrgent) {
++ CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> future = this.getChunkFutureMainThread(x, z, status, true, isUrgent);
++ Long identifier = Long.valueOf(this.asyncLoadSeqCounter++);
++ int ticketLevel = net.minecraft.server.MCUtil.getTicketLevelFor(status);
++ this.addTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier);
++
++ return future.thenComposeAsync((Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure> either) -> {
++ // either left -> success
++ // either right -> failure
++
++ this.removeTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier);
++ this.addTicketAtLevel(TicketType.UNKNOWN, chunkPos, ticketLevel, chunkPos); // allow unloading
++
++ Optional<ChunkHolder.ChunkLoadingFailure> failure = either.right();
++
++ if (failure.isPresent()) {
++ // failure
++ throw new IllegalStateException("Chunk failed to load: " + failure.get().toString());
++ }
++
++ return CompletableFuture.completedFuture(either);
++ }, this.mainThreadProcessor);
++ }
++ // Paper end - async chunk io
+
+ @Nullable
+ @Override
+ public ChunkAccess getChunk(int x, int z, ChunkStatus leastStatus, boolean create) {
++ final int x1 = x; final int z1 = z; // Paper - conflict on variable change
+ if (Thread.currentThread() != this.mainThread) {
+ return (ChunkAccess) CompletableFuture.supplyAsync(() -> {
+ return this.getChunk(x, z, leastStatus, create);
+@@ -334,13 +435,18 @@ public class ServerChunkCache extends ChunkSource {
+ }
+
+ gameprofilerfiller.incrementCounter("getChunkCacheMiss");
+- CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> completablefuture = this.getChunkFutureMainThread(x, z, leastStatus, create);
++ CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> completablefuture = this.getChunkFutureMainThread(x, z, leastStatus, create, true); // Paper
+ ServerChunkCache.MainThreadExecutor chunkproviderserver_b = this.mainThreadProcessor;
+
+ Objects.requireNonNull(completablefuture);
+ if (!completablefuture.isDone()) { // Paper
++ // Paper start - async chunk io/loading
++ this.level.asyncChunkTaskManager.raisePriority(x1, z1, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY);
++ com.destroystokyo.paper.io.chunk.ChunkTaskManager.pushChunkWait(this.level, x1, z1);
++ // Paper end
+ this.level.timings.syncChunkLoad.startTiming(); // Paper
+ chunkproviderserver_b.managedBlock(completablefuture::isDone);
++ com.destroystokyo.paper.io.chunk.ChunkTaskManager.popChunkWait(); // Paper - async chunk debug
+ this.level.timings.syncChunkLoad.stopTiming(); // Paper
+ } // Paper
+ ichunkaccess = (ChunkAccess) ((Either) completablefuture.join()).map((ichunkaccess1) -> {
+@@ -427,6 +533,11 @@ public class ServerChunkCache extends ChunkSource {
+ }
+
+ private CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> getChunkFutureMainThread(int chunkX, int chunkZ, ChunkStatus leastStatus, boolean create) {
++ // Paper start - add isUrgent - old sig left in place for dirty nms plugins
++ return getChunkFutureMainThread(chunkX, chunkZ, leastStatus, create, false);
++ }
++ private CompletableFuture<Either<ChunkAccess, ChunkHolder.ChunkLoadingFailure>> getChunkFutureMainThread(int chunkX, int chunkZ, ChunkStatus leastStatus, boolean create, boolean isUrgent) {
++ // Paper end
+ ChunkPos chunkcoordintpair = new ChunkPos(chunkX, chunkZ);
+ long k = chunkcoordintpair.toLong();
+ int l = 33 + ChunkStatus.getDistance(leastStatus);
+@@ -841,11 +952,12 @@ public class ServerChunkCache extends ChunkSource {
+ // CraftBukkit start - process pending Chunk loadCallback() and unloadCallback() after each run task
+ public boolean pollTask() {
+ try {
++ boolean execChunkTask = com.destroystokyo.paper.io.chunk.ChunkTaskManager.pollChunkWaitQueue() || ServerChunkCache.this.level.asyncChunkTaskManager.pollNextChunkTask(); // Paper
+ if (ServerChunkCache.this.runDistanceManagerUpdates()) {
+ return true;
+ } else {
+ ServerChunkCache.this.lightEngine.tryScheduleUpdate();
+- return super.pollTask();
++ return super.pollTask() || execChunkTask; // Paper
+ }
+ } finally {
+ chunkMap.callbackExecutor.run();
+diff --git a/src/main/java/net/minecraft/server/level/ServerLevel.java b/src/main/java/net/minecraft/server/level/ServerLevel.java
+index 99d44faab5b5da244fdc170c73d73723c174c8fd..2f7646e2bcc9622d8579eec25b56615da5a84d06 100644
+--- a/src/main/java/net/minecraft/server/level/ServerLevel.java
++++ b/src/main/java/net/minecraft/server/level/ServerLevel.java
+@@ -315,6 +315,78 @@ public class ServerLevel extends Level implements WorldGenLevel {
+ }
+ }
+ }
++
++ // Paper start - Asynchronous IO
++ public final com.destroystokyo.paper.io.PaperFileIOThread.ChunkDataController poiDataController = new com.destroystokyo.paper.io.PaperFileIOThread.ChunkDataController() {
++ @Override
++ public void writeData(int x, int z, net.minecraft.nbt.CompoundTag compound) throws java.io.IOException {
++ ServerLevel.this.getChunkSource().chunkMap.getPoiManager().write(new ChunkPos(x, z), compound);
++ }
++
++ @Override
++ public net.minecraft.nbt.CompoundTag readData(int x, int z) throws java.io.IOException {
++ return ServerLevel.this.getChunkSource().chunkMap.getPoiManager().read(new ChunkPos(x, z));
++ }
++
++ @Override
++ public <T> T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function<net.minecraft.world.level.chunk.storage.RegionFile, T> function) {
++ synchronized (ServerLevel.this.getChunkSource().chunkMap.getPoiManager()) {
++ net.minecraft.world.level.chunk.storage.RegionFile file;
++
++ try {
++ file = ServerLevel.this.getChunkSource().chunkMap.getPoiManager().getRegionFile(new ChunkPos(chunkX, chunkZ), false);
++ } catch (java.io.IOException ex) {
++ throw new RuntimeException(ex);
++ }
++
++ return function.apply(file);
++ }
++ }
++
++ @Override
++ public <T> T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function<net.minecraft.world.level.chunk.storage.RegionFile, T> function) {
++ synchronized (ServerLevel.this.getChunkSource().chunkMap.getPoiManager()) {
++ net.minecraft.world.level.chunk.storage.RegionFile file = ServerLevel.this.getChunkSource().chunkMap.getPoiManager().getRegionFileIfLoaded(new ChunkPos(chunkX, chunkZ));
++ return function.apply(file);
++ }
++ }
++ };
++
++ public final com.destroystokyo.paper.io.PaperFileIOThread.ChunkDataController chunkDataController = new com.destroystokyo.paper.io.PaperFileIOThread.ChunkDataController() {
++ @Override
++ public void writeData(int x, int z, net.minecraft.nbt.CompoundTag compound) throws java.io.IOException {
++ ServerLevel.this.getChunkSource().chunkMap.write(new ChunkPos(x, z), compound);
++ }
++
++ @Override
++ public net.minecraft.nbt.CompoundTag readData(int x, int z) throws java.io.IOException {
++ return ServerLevel.this.getChunkSource().chunkMap.readSync(new ChunkPos(x, z));
++ }
++
++ @Override
++ public <T> T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function<net.minecraft.world.level.chunk.storage.RegionFile, T> function) {
++ synchronized (ServerLevel.this.getChunkSource().chunkMap) {
++ net.minecraft.world.level.chunk.storage.RegionFile file;
++
++ try {
++ file = ServerLevel.this.getChunkSource().chunkMap.regionFileCache.getRegionFile(new ChunkPos(chunkX, chunkZ), false);
++ } catch (java.io.IOException ex) {
++ throw new RuntimeException(ex);
++ }
++
++ return function.apply(file);
++ }
++ }
++
++ @Override
++ public <T> T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function<net.minecraft.world.level.chunk.storage.RegionFile, T> function) {
++ synchronized (ServerLevel.this.getChunkSource().chunkMap) {
++ net.minecraft.world.level.chunk.storage.RegionFile file = ServerLevel.this.getChunkSource().chunkMap.regionFileCache.getRegionFileIfLoaded(new ChunkPos(chunkX, chunkZ));
++ return function.apply(file);
++ }
++ }
++ };
++ public final com.destroystokyo.paper.io.chunk.ChunkTaskManager asyncChunkTaskManager;
+ // Paper end
+
+ // Add env and gen to constructor, IWorldDataServer -> WorldDataServer
+@@ -397,6 +469,8 @@ public class ServerLevel extends Level implements WorldGenLevel {
+
+ this.sleepStatus = new SleepStatus();
+ this.getCraftServer().addWorld(this.getWorld()); // CraftBukkit
++
++ this.asyncChunkTaskManager = new com.destroystokyo.paper.io.chunk.ChunkTaskManager(this); // Paper
+ }
+
+ public void setWeatherParameters(int clearDuration, int rainDuration, boolean raining, boolean thundering) {
+diff --git a/src/main/java/net/minecraft/server/level/TicketType.java b/src/main/java/net/minecraft/server/level/TicketType.java
+index dfa08dbf025ed702a864280a540e0169b9f33cbd..10fa6cec911950f72407ae7f45c8cf48caa9421a 100644
+--- a/src/main/java/net/minecraft/server/level/TicketType.java
++++ b/src/main/java/net/minecraft/server/level/TicketType.java
+@@ -8,6 +8,7 @@ import net.minecraft.world.level.ChunkPos;
+
+ public class TicketType<T> {
+ public static final TicketType<Long> FUTURE_AWAIT = create("future_await", Long::compareTo); // Paper
++ public static final TicketType<Long> ASYNC_LOAD = create("async_load", Long::compareTo); // Paper
+
+ private final String name;
+ private final Comparator<T> comparator;
+diff --git a/src/main/java/net/minecraft/server/network/ServerGamePacketListenerImpl.java b/src/main/java/net/minecraft/server/network/ServerGamePacketListenerImpl.java
+index cdc24defe649644ceade1c6cfcfe20c29ca936c1..5072d4dc1f7f77c61e3cc72c1101cb95f6596ce7 100644
+--- a/src/main/java/net/minecraft/server/network/ServerGamePacketListenerImpl.java
++++ b/src/main/java/net/minecraft/server/network/ServerGamePacketListenerImpl.java
+@@ -784,6 +784,13 @@ public class ServerGamePacketListenerImpl implements ServerPlayerConnection, Tic
+ this.disconnect(Component.translatable("disconnect.spam"));
+ return;
+ }
++ // Paper start
++ String str = packet.getCommand(); int index = -1;
++ if (str.length() > 64 && ((index = str.indexOf(' ')) == -1 || index >= 64)) {
++ server.scheduleOnMain(() -> this.disconnect(Component.translatable("disconnect.spam", new Object[0]))); // Paper
++ return;
++ }
++ // Paper end
+ // CraftBukkit end
+ StringReader stringreader = new StringReader(packet.getCommand());
+
+diff --git a/src/main/java/net/minecraft/util/worldupdate/WorldUpgrader.java b/src/main/java/net/minecraft/util/worldupdate/WorldUpgrader.java
+index 821052e93ee753db6aaf499bbf39dc30598fe72f..2955c1ee153c410ea45fe367bac8597621c9bbd0 100644
+--- a/src/main/java/net/minecraft/util/worldupdate/WorldUpgrader.java
++++ b/src/main/java/net/minecraft/util/worldupdate/WorldUpgrader.java
+@@ -182,7 +182,11 @@ public class WorldUpgrader {
+ }
+
+ WorldUpgrader.LOGGER.error("Error upgrading chunk {}", chunkcoordintpair, throwable);
++ // Paper start
++ } catch (IOException e) {
++ WorldUpgrader.LOGGER.error("Error upgrading chunk {}", chunkcoordintpair, e);
+ }
++ // Paper end
+
+ if (flag1) {
+ ++this.converted;
+diff --git a/src/main/java/net/minecraft/world/entity/ai/village/poi/PoiManager.java b/src/main/java/net/minecraft/world/entity/ai/village/poi/PoiManager.java
+index db4fa7355b1f834d0f8a0710c1c583dded184613..ab9bb440c8e91ecb49c1e14a427d35087a87ac80 100644
+--- a/src/main/java/net/minecraft/world/entity/ai/village/poi/PoiManager.java
++++ b/src/main/java/net/minecraft/world/entity/ai/village/poi/PoiManager.java
+@@ -40,9 +40,11 @@ public class PoiManager extends SectionStorage<PoiSection> {
+ public static final int VILLAGE_SECTION_SIZE = 1;
+ private final PoiManager.DistanceTracker distanceTracker;
+ private final LongSet loadedChunks = new LongOpenHashSet();
++ private final net.minecraft.server.level.ServerLevel world; // Paper
+
+ public PoiManager(Path path, DataFixer dataFixer, boolean dsync, RegistryAccess registryManager, LevelHeightAccessor world) {
+ super(path, PoiSection::codec, PoiSection::new, dataFixer, DataFixTypes.POI_CHUNK, dsync, registryManager, world);
++ this.world = (net.minecraft.server.level.ServerLevel)world; // Paper
+ this.distanceTracker = new PoiManager.DistanceTracker();
+ }
+
+@@ -195,7 +197,18 @@ public class PoiManager extends SectionStorage<PoiSection> {
+
+ @Override
+ public void tick(BooleanSupplier shouldKeepTicking) {
+- super.tick(shouldKeepTicking);
++ // Paper start - async chunk io
++ while (!this.dirty.isEmpty() && shouldKeepTicking.getAsBoolean()) {
++ ChunkPos chunkcoordintpair = SectionPos.of(this.dirty.firstLong()).chunk();
++
++ net.minecraft.nbt.CompoundTag data;
++ try (co.aikar.timings.Timing ignored1 = this.world.timings.poiSaveDataSerialization.startTiming()) {
++ data = this.getData(chunkcoordintpair);
++ }
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(this.world,
++ chunkcoordintpair.x, chunkcoordintpair.z, data, null, com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY);
++ }
++ // Paper end
+ this.distanceTracker.runAllUpdates();
+ }
+
+@@ -288,6 +301,35 @@ public class PoiManager extends SectionStorage<PoiSection> {
+ }
+ }
+
++ // Paper start - Asynchronous chunk io
++ @javax.annotation.Nullable
++ @Override
++ public net.minecraft.nbt.CompoundTag read(ChunkPos chunkcoordintpair) throws java.io.IOException {
++ if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
++ net.minecraft.nbt.CompoundTag ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE
++ .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
++ true, false, true).join().poiData;
++
++ if (ret == com.destroystokyo.paper.io.PaperFileIOThread.FAILURE_VALUE) {
++ throw new java.io.IOException("See logs for further detail");
++ }
++ return ret;
++ }
++ return super.read(chunkcoordintpair);
++ }
++
++ @Override
++ public void write(ChunkPos chunkcoordintpair, net.minecraft.nbt.CompoundTag nbttagcompound) throws java.io.IOException {
++ if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(
++ this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null,
++ com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread());
++ return;
++ }
++ super.write(chunkcoordintpair, nbttagcompound);
++ }
++ // Paper end
++
+ public static enum Occupancy {
+ HAS_SPACE(PoiRecord::hasSpace),
+ IS_OCCUPIED(PoiRecord::isOccupied),
+diff --git a/src/main/java/net/minecraft/world/level/chunk/storage/ChunkSerializer.java b/src/main/java/net/minecraft/world/level/chunk/storage/ChunkSerializer.java
+index 864e2e0355a5fb8c1d4a5b0896ba299faf9ea534..8cc2a2c026eb44461cd94faeb64fb2151d2d3898 100644
+--- a/src/main/java/net/minecraft/world/level/chunk/storage/ChunkSerializer.java
++++ b/src/main/java/net/minecraft/world/level/chunk/storage/ChunkSerializer.java
+@@ -84,7 +84,31 @@ public class ChunkSerializer {
+
+ public ChunkSerializer() {}
+
++ // Paper start
++ public static final class InProgressChunkHolder {
++
++ public final ProtoChunk protoChunk;
++ public final java.util.ArrayDeque<Runnable> tasks;
++
++ public CompoundTag poiData;
++
++ public InProgressChunkHolder(final ProtoChunk protoChunk, final java.util.ArrayDeque<Runnable> tasks) {
++ this.protoChunk = protoChunk;
++ this.tasks = tasks;
++ }
++ }
++ // Paper end
++
+ public static ProtoChunk read(ServerLevel world, PoiManager poiStorage, ChunkPos chunkPos, CompoundTag nbt) {
++ // Paper start - add variant for async calls
++ InProgressChunkHolder holder = loadChunk(world, poiStorage, chunkPos, nbt, true);
++ holder.tasks.forEach(Runnable::run);
++ return holder.protoChunk;
++ }
++
++ public static InProgressChunkHolder loadChunk(ServerLevel world, PoiManager poiStorage, ChunkPos chunkPos, CompoundTag nbt, boolean distinguish) {
++ java.util.ArrayDeque<Runnable> tasksToExecuteOnMain = new java.util.ArrayDeque<>();
++ // Paper end
+ ChunkPos chunkcoordintpair1 = new ChunkPos(nbt.getInt("xPos"), nbt.getInt("zPos"));
+
+ if (!Objects.equals(chunkPos, chunkcoordintpair1)) {
+@@ -141,7 +165,9 @@ public class ChunkSerializer {
+ LevelChunkSection chunksection = new LevelChunkSection(b0, datapaletteblock, (PalettedContainer) object); // CraftBukkit - read/write
+
+ achunksection[k] = chunksection;
++ tasksToExecuteOnMain.add(() -> { // Paper - delay this task since we're executing off-main
+ poiStorage.checkConsistencyWithBlocks(chunkPos, chunksection);
++ }); // Paper - delay this task since we're executing off-main
+ }
+
+ boolean flag3 = nbttagcompound1.contains("BlockLight", 7);
+@@ -149,16 +175,28 @@ public class ChunkSerializer {
+
+ if (flag3 || flag4) {
+ if (!flag2) {
++ tasksToExecuteOnMain.add(() -> { // Paper - delay this task since we're executing off-main
+ lightengine.retainData(chunkPos, true);
++ }); // Paper - delay this task since we're executing off-main
+ flag2 = true;
+ }
+
+ if (flag3) {
+- lightengine.queueSectionData(LightLayer.BLOCK, SectionPos.of(chunkPos, b0), new DataLayer(nbttagcompound1.getByteArray("BlockLight")), true);
++ // Paper start - delay this task since we're executing off-main
++ DataLayer blockLight = new DataLayer(nbttagcompound1.getByteArray("BlockLight").clone());
++ tasksToExecuteOnMain.add(() -> {
++ lightengine.queueSectionData(LightLayer.BLOCK, SectionPos.of(chunkPos, b0), blockLight, true);
++ });
++ // Paper end - delay this task since we're executing off-main
+ }
+
+ if (flag4) {
+- lightengine.queueSectionData(LightLayer.SKY, SectionPos.of(chunkPos, b0), new DataLayer(nbttagcompound1.getByteArray("SkyLight")), true);
++ // Paper start - delay this task since we're executing off-main
++ DataLayer skyLight = new DataLayer(nbttagcompound1.getByteArray("SkyLight").clone());
++ tasksToExecuteOnMain.add(() -> {
++ lightengine.queueSectionData(LightLayer.SKY, SectionPos.of(chunkPos, b0), skyLight, true);
++ });
++ // Paper end - delay this task since we're executing off-mai
+ }
+ }
+ }
+@@ -278,7 +316,7 @@ public class ChunkSerializer {
+ }
+
+ if (chunkstatus_type == ChunkStatus.ChunkType.LEVELCHUNK) {
+- return new ImposterProtoChunk((LevelChunk) object1, false);
++ return new InProgressChunkHolder(new ImposterProtoChunk((LevelChunk) object1, false), tasksToExecuteOnMain); // Paper - Async chunk loading
+ } else {
+ ProtoChunk protochunk1 = (ProtoChunk) object1;
+
+@@ -317,9 +355,67 @@ public class ChunkSerializer {
+ protochunk1.setCarvingMask(worldgenstage_features, new CarvingMask(nbttagcompound4.getLongArray(s1), ((ChunkAccess) object1).getMinBuildHeight()));
+ }
+
+- return protochunk1;
++ return new InProgressChunkHolder(protochunk1, tasksToExecuteOnMain); // Paper - Async chunk loading
++ }
++ }
++
++ // Paper start - async chunk save for unload
++ public record AsyncSaveData(
++ DataLayer[] blockLight,
++ DataLayer[] skyLight,
++ Tag blockTickList, // non-null if we had to go to the server's tick list
++ Tag fluidTickList, // non-null if we had to go to the server's tick list
++ ListTag blockEntities,
++ long worldTime
++ ) {}
++
++ // must be called sync
++ public static AsyncSaveData getAsyncSaveData(ServerLevel world, ChunkAccess chunk) {
++ org.spigotmc.AsyncCatcher.catchOp("preparation of chunk data for async save");
++ ChunkPos chunkPos = chunk.getPos();
++
++ ThreadedLevelLightEngine lightenginethreaded = world.getChunkSource().getLightEngine();
++
++ DataLayer[] blockLight = new DataLayer[lightenginethreaded.getMaxLightSection() - lightenginethreaded.getMinLightSection()];
++ DataLayer[] skyLight = new DataLayer[lightenginethreaded.getMaxLightSection() - lightenginethreaded.getMinLightSection()];
++
++ for (int i = lightenginethreaded.getMinLightSection(); i < lightenginethreaded.getMaxLightSection(); ++i) {
++ DataLayer blockArray = lightenginethreaded.getLayerListener(LightLayer.BLOCK).getDataLayerData(SectionPos.of(chunkPos, i));
++ DataLayer skyArray = lightenginethreaded.getLayerListener(LightLayer.SKY).getDataLayerData(SectionPos.of(chunkPos, i));
++
++ // copy data for safety
++ if (blockArray != null) {
++ blockArray = blockArray.copy();
++ }
++ if (skyArray != null) {
++ skyArray = skyArray.copy();
++ }
++
++ blockLight[i - lightenginethreaded.getMinLightSection()] = blockArray;
++ skyLight[i - lightenginethreaded.getMinLightSection()] = skyArray;
++ }
++
++ final CompoundTag tickLists = new CompoundTag();
++ ChunkSerializer.saveTicks(world, tickLists, chunk.getTicksForSerialization());
++
++ ListTag blockEntitiesSerialized = new ListTag();
++ for (final BlockPos blockPos : chunk.getBlockEntitiesPos()) {
++ final CompoundTag blockEntityNbt = chunk.getBlockEntityNbtForSaving(blockPos);
++ if (blockEntityNbt != null) {
++ blockEntitiesSerialized.add(blockEntityNbt);
++ }
+ }
++
++ return new AsyncSaveData(
++ blockLight,
++ skyLight,
++ tickLists.get(BLOCK_TICKS_TAG),
++ tickLists.get(FLUID_TICKS_TAG),
++ blockEntitiesSerialized,
++ world.getGameTime()
++ );
+ }
++ // Paper end
+
+ private static void logErrors(ChunkPos chunkPos, int y, String message) {
+ ChunkSerializer.LOGGER.error("Recoverable errors when loading section [" + chunkPos.x + ", " + y + ", " + chunkPos.z + "]: " + message);
+@@ -336,6 +432,11 @@ public class ChunkSerializer {
+ // CraftBukkit end
+
+ public static CompoundTag write(ServerLevel world, ChunkAccess chunk) {
++ // Paper start
++ return saveChunk(world, chunk, null);
++ }
++ public static CompoundTag saveChunk(ServerLevel world, ChunkAccess chunk, @org.checkerframework.checker.nullness.qual.Nullable AsyncSaveData asyncsavedata) {
++ // Paper end
+ ChunkPos chunkcoordintpair = chunk.getPos();
+ CompoundTag nbttagcompound = new CompoundTag();
+
+@@ -343,7 +444,7 @@ public class ChunkSerializer {
+ nbttagcompound.putInt("xPos", chunkcoordintpair.x);
+ nbttagcompound.putInt("yPos", chunk.getMinSection());
+ nbttagcompound.putInt("zPos", chunkcoordintpair.z);
+- nbttagcompound.putLong("LastUpdate", world.getGameTime());
++ nbttagcompound.putLong("LastUpdate", asyncsavedata != null ? asyncsavedata.worldTime : world.getGameTime()); // Paper - async chunk unloading
+ nbttagcompound.putLong("InhabitedTime", chunk.getInhabitedTime());
+ nbttagcompound.putString("Status", chunk.getStatus().getName());
+ BlendingData blendingdata = chunk.getBlendingData();
+@@ -386,8 +487,17 @@ public class ChunkSerializer {
+ for (int i = lightenginethreaded.getMinLightSection(); i < lightenginethreaded.getMaxLightSection(); ++i) {
+ int j = chunk.getSectionIndexFromSectionY(i);
+ boolean flag1 = j >= 0 && j < achunksection.length;
+- DataLayer nibblearray = lightenginethreaded.getLayerListener(LightLayer.BLOCK).getDataLayerData(SectionPos.of(chunkcoordintpair, i));
+- DataLayer nibblearray1 = lightenginethreaded.getLayerListener(LightLayer.SKY).getDataLayerData(SectionPos.of(chunkcoordintpair, i));
++ // Paper start - async chunk save for unload
++ DataLayer nibblearray; // block light
++ DataLayer nibblearray1; // sky light
++ if (asyncsavedata == null) {
++ nibblearray = lightenginethreaded.getLayerListener(LightLayer.BLOCK).getDataLayerData(SectionPos.of(chunkcoordintpair, i)); /// Paper - diff on method change (see getAsyncSaveData)
++ nibblearray1 = lightenginethreaded.getLayerListener(LightLayer.SKY).getDataLayerData(SectionPos.of(chunkcoordintpair, i)); // Paper - diff on method change (see getAsyncSaveData)
++ } else {
++ nibblearray = asyncsavedata.blockLight[i - lightenginethreaded.getMinLightSection()];
++ nibblearray1 = asyncsavedata.skyLight[i - lightenginethreaded.getMinLightSection()];
++ }
++ // Paper end
+
+ if (flag1 || nibblearray != null || nibblearray1 != null) {
+ CompoundTag nbttagcompound1 = new CompoundTag();
+@@ -425,8 +535,17 @@ public class ChunkSerializer {
+ nbttagcompound.putBoolean("isLightOn", true);
+ }
+
+- ListTag nbttaglist1 = new ListTag();
+- Iterator iterator = chunk.getBlockEntitiesPos().iterator();
++ // Paper start
++ ListTag nbttaglist1;
++ Iterator<BlockPos> iterator;
++ if (asyncsavedata != null) {
++ nbttaglist1 = asyncsavedata.blockEntities;
++ iterator = java.util.Collections.emptyIterator();
++ } else {
++ nbttaglist1 = new ListTag();
++ iterator = chunk.getBlockEntitiesPos().iterator();
++ }
++ // Paper end
+
+ CompoundTag nbttagcompound2;
+
+@@ -463,7 +582,14 @@ public class ChunkSerializer {
+ nbttagcompound.put("CarvingMasks", nbttagcompound2);
+ }
+
++ // Paper start
++ if (asyncsavedata != null) {
++ nbttagcompound.put(BLOCK_TICKS_TAG, asyncsavedata.blockTickList);
++ nbttagcompound.put(FLUID_TICKS_TAG, asyncsavedata.fluidTickList);
++ } else {
+ ChunkSerializer.saveTicks(world, nbttagcompound, chunk.getTicksForSerialization());
++ }
++ // Paper end
+ nbttagcompound.put("PostProcessing", ChunkSerializer.packOffsets(chunk.getPostProcessing()));
+ CompoundTag nbttagcompound3 = new CompoundTag();
+ Iterator iterator1 = chunk.getHeightmaps().iterator();
+diff --git a/src/main/java/net/minecraft/world/level/chunk/storage/ChunkStorage.java b/src/main/java/net/minecraft/world/level/chunk/storage/ChunkStorage.java
+index c56946f86565ad1ac41bb7b655c113f648d2f539..694778b5c23dbe9c8603c3483476b5252aa079bc 100644
+--- a/src/main/java/net/minecraft/world/level/chunk/storage/ChunkStorage.java
++++ b/src/main/java/net/minecraft/world/level/chunk/storage/ChunkStorage.java
+@@ -28,26 +28,33 @@ import net.minecraft.world.level.storage.DimensionDataStorage;
+ public class ChunkStorage implements AutoCloseable {
+
+ public static final int LAST_MONOLYTH_STRUCTURE_DATA_VERSION = 1493;
+- private final IOWorker worker;
++ // Paper - nuke IO worker
+ protected final DataFixer fixerUpper;
+ @Nullable
+ private volatile LegacyStructureDataHandler legacyStructureHandler;
++ // Paper start - async chunk loading
++ private final Object persistentDataLock = new Object(); // Paper
++ public final RegionFileStorage regionFileCache;
++ // Paper end - async chunk loading
+
+ public ChunkStorage(Path directory, DataFixer dataFixer, boolean dsync) {
+ this.fixerUpper = dataFixer;
+- this.worker = new IOWorker(directory, dsync, "chunk");
++ // Paper start - async chunk io
++ // remove IO worker
++ this.regionFileCache = new RegionFileStorage(directory, dsync); // Paper - nuke IOWorker
++ // Paper end - async chunk io
+ }
+
+ public boolean isOldChunkAround(ChunkPos chunkPos, int checkRadius) {
+- return this.worker.isOldChunkAround(chunkPos, checkRadius);
++ return true; // Paper - (for now, old unoptimised behavior) TODO implement later? the chunk status that blender uses SHOULD already have this radius loaded, no need to go back for it...
+ }
+
+ // CraftBukkit start
+ private boolean check(ServerChunkCache cps, int x, int z) {
+ ChunkPos pos = new ChunkPos(x, z);
+ if (cps != null) {
+- com.google.common.base.Preconditions.checkState(org.bukkit.Bukkit.isPrimaryThread(), "primary thread");
+- if (cps.hasChunk(x, z)) {
++ //com.google.common.base.Preconditions.checkState(org.bukkit.Bukkit.isPrimaryThread(), "primary thread"); // Paper - this function is now MT-Safe
++ if (cps.getChunkAtIfCachedImmediately(x, z) != null) { // Paper - isLoaded is a ticket level check, not a chunk loaded check!
+ return true;
+ }
+ }
+@@ -75,6 +82,7 @@ public class ChunkStorage implements AutoCloseable {
+
+ public CompoundTag upgradeChunkTag(ResourceKey<LevelStem> resourcekey, Supplier<DimensionDataStorage> supplier, CompoundTag nbttagcompound, Optional<ResourceKey<Codec<? extends ChunkGenerator>>> optional, ChunkPos pos, @Nullable LevelAccessor generatoraccess) {
+ // CraftBukkit end
++ nbttagcompound = nbttagcompound.copy(); // Paper - defensive copy, another thread might modify this
+ int i = ChunkStorage.getVersion(nbttagcompound);
+
+ // CraftBukkit start
+@@ -92,9 +100,11 @@ public class ChunkStorage implements AutoCloseable {
+ if (i < 1493) {
+ nbttagcompound = NbtUtils.update(this.fixerUpper, DataFixTypes.CHUNK, nbttagcompound, i, 1493);
+ if (nbttagcompound.getCompound("Level").getBoolean("hasLegacyStructureData")) {
++ synchronized (this.persistentDataLock) { // Paper - Async chunk loading
+ LegacyStructureDataHandler persistentstructurelegacy = this.getLegacyStructureHandler(resourcekey, supplier);
+
+ nbttagcompound = persistentstructurelegacy.updateFromLegacy(nbttagcompound);
++ } // Paper - Async chunk loading
+ }
+ }
+
+@@ -127,7 +137,7 @@ public class ChunkStorage implements AutoCloseable {
+ LegacyStructureDataHandler persistentstructurelegacy = this.legacyStructureHandler;
+
+ if (persistentstructurelegacy == null) {
+- synchronized (this) {
++ synchronized (this.persistentDataLock) { // Paper - async chunk loading
+ persistentstructurelegacy = this.legacyStructureHandler;
+ if (persistentstructurelegacy == null) {
+ this.legacyStructureHandler = persistentstructurelegacy = LegacyStructureDataHandler.getLegacyStructureHandler(resourcekey, (DimensionDataStorage) supplier.get());
+@@ -153,26 +163,49 @@ public class ChunkStorage implements AutoCloseable {
+ }
+
+ public CompletableFuture<Optional<CompoundTag>> read(ChunkPos chunkPos) {
+- return this.worker.loadAsync(chunkPos);
++ // Paper start - async chunk io
++ try {
++ return CompletableFuture.completedFuture(Optional.ofNullable(this.readSync(chunkPos)));
++ } catch (Throwable thr) {
++ return CompletableFuture.failedFuture(thr);
++ }
++ }
++ @Nullable
++ public CompoundTag readSync(ChunkPos chunkPos) throws IOException {
++ return this.regionFileCache.read(chunkPos);
+ }
++ // Paper end - async chunk io
+
+- public void write(ChunkPos chunkPos, CompoundTag nbt) {
+- this.worker.store(chunkPos, nbt);
++ // Paper start - async chunk io
++ public void write(ChunkPos chunkPos, CompoundTag nbt) throws IOException {
++ this.regionFileCache.write(chunkPos, nbt);
++ // Paper end - Async chunk loading
+ if (this.legacyStructureHandler != null) {
++ synchronized (this.persistentDataLock) { // Paper - Async chunk loading
+ this.legacyStructureHandler.removeIndex(chunkPos.toLong());
++ } // Paper - Async chunk loading
+ }
+
+ }
+
+ public void flushWorker() {
+- this.worker.synchronize(true).join();
++ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.flush(); // Paper - nuke IO worker
+ }
+
+ public void close() throws IOException {
+- this.worker.close();
++ this.regionFileCache.close(); // Paper - nuke IO worker
+ }
+
+ public ChunkScanAccess chunkScanner() {
+- return this.worker;
++ // Paper start - nuke IO worker
++ return ((chunkPos, streamTagVisitor) -> {
++ try {
++ this.regionFileCache.scanChunk(chunkPos, streamTagVisitor);
++ return java.util.concurrent.CompletableFuture.completedFuture(null);
++ } catch (IOException e) {
++ throw new RuntimeException(e);
++ }
++ });
++ // Paper end
+ }
+ }
+diff --git a/src/main/java/net/minecraft/world/level/chunk/storage/RegionFile.java b/src/main/java/net/minecraft/world/level/chunk/storage/RegionFile.java
+index d51bafd2f5a763b8a49c835ab74a7cf60caa1ab6..7412da51c2eae70f17f4883f7223303d570c8402 100644
+--- a/src/main/java/net/minecraft/world/level/chunk/storage/RegionFile.java
++++ b/src/main/java/net/minecraft/world/level/chunk/storage/RegionFile.java
+@@ -44,6 +44,7 @@ public class RegionFile implements AutoCloseable {
+ private final IntBuffer timestamps;
+ @VisibleForTesting
+ protected final RegionBitmap usedSectors;
++ public final java.util.concurrent.locks.ReentrantLock fileLock = new java.util.concurrent.locks.ReentrantLock(true); // Paper
+
+ public RegionFile(Path file, Path directory, boolean dsync) throws IOException {
+ this(file, directory, RegionFileVersion.VERSION_DEFLATE, dsync);
+@@ -228,7 +229,7 @@ public class RegionFile implements AutoCloseable {
+ return (byteCount + 4096 - 1) / 4096;
+ }
+
+- public boolean doesChunkExist(ChunkPos pos) {
++ public synchronized boolean doesChunkExist(ChunkPos pos) { // Paper - synchronized
+ int i = this.getOffset(pos);
+
+ if (i == 0) {
+@@ -393,6 +394,11 @@ public class RegionFile implements AutoCloseable {
+ }
+
+ public void close() throws IOException {
++ // Paper start - Prevent regionfiles from being closed during use
++ this.fileLock.lock();
++ synchronized (this) {
++ try {
++ // Paper end
+ try {
+ this.padToFullSector();
+ } finally {
+@@ -402,6 +408,10 @@ public class RegionFile implements AutoCloseable {
+ this.file.close();
+ }
+ }
++ } finally { // Paper start - Prevent regionfiles from being closed during use
++ this.fileLock.unlock();
++ }
++ } // Paper end
+
+ }
+
+diff --git a/src/main/java/net/minecraft/world/level/chunk/storage/RegionFileStorage.java b/src/main/java/net/minecraft/world/level/chunk/storage/RegionFileStorage.java
+index 8ba1c073387fa21a20bd42a873ec3cc314eae64e..6fa0bc18ab05b9fb05521f46c5dadb695f1ec05b 100644
+--- a/src/main/java/net/minecraft/world/level/chunk/storage/RegionFileStorage.java
++++ b/src/main/java/net/minecraft/world/level/chunk/storage/RegionFileStorage.java
+@@ -29,11 +29,32 @@ public class RegionFileStorage implements AutoCloseable {
+ this.sync = dsync;
+ }
+
+- private RegionFile getRegionFile(ChunkPos chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit
++ // Paper start
++ public synchronized RegionFile getRegionFileIfLoaded(ChunkPos chunkcoordintpair) {
++ return this.regionCache.getAndMoveToFirst(ChunkPos.asLong(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ()));
++ }
++
++ public synchronized boolean chunkExists(ChunkPos pos) throws IOException {
++ RegionFile regionfile = getRegionFile(pos, true);
++
++ return regionfile != null ? regionfile.hasChunk(pos) : false;
++ }
++
++ public synchronized RegionFile getRegionFile(ChunkPos chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit
++ return this.getRegionFile(chunkcoordintpair, existingOnly, false);
++ }
++ public synchronized RegionFile getRegionFile(ChunkPos chunkcoordintpair, boolean existingOnly, boolean lock) throws IOException {
++ // Paper end
+ long i = ChunkPos.asLong(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ());
+ RegionFile regionfile = (RegionFile) this.regionCache.getAndMoveToFirst(i);
+
+ if (regionfile != null) {
++ // Paper start
++ if (lock) {
++ // must be in this synchronized block
++ regionfile.fileLock.lock();
++ }
++ // Paper end
+ return regionfile;
+ } else {
+ if (this.regionCache.size() >= 256) {
+@@ -48,6 +69,12 @@ public class RegionFileStorage implements AutoCloseable {
+ RegionFile regionfile1 = new RegionFile(path1, this.folder, this.sync);
+
+ this.regionCache.putAndMoveToFirst(i, regionfile1);
++ // Paper start
++ if (lock) {
++ // must be in this synchronized block
++ regionfile1.fileLock.lock();
++ }
++ // Paper end
+ return regionfile1;
+ }
+ }
+@@ -55,11 +82,12 @@ public class RegionFileStorage implements AutoCloseable {
+ @Nullable
+ public CompoundTag read(ChunkPos pos) throws IOException {
+ // CraftBukkit start - SPIGOT-5680: There's no good reason to preemptively create files on read, save that for writing
+- RegionFile regionfile = this.getRegionFile(pos, true);
++ RegionFile regionfile = this.getRegionFile(pos, true, true); // Paper
+ if (regionfile == null) {
+ return null;
+ }
+ // CraftBukkit end
++ try { // Paper
+ DataInputStream datainputstream = regionfile.getChunkDataInputStream(pos);
+
+ CompoundTag nbttagcompound;
+@@ -96,6 +124,9 @@ public class RegionFileStorage implements AutoCloseable {
+ }
+
+ return nbttagcompound;
++ } finally { // Paper start
++ regionfile.fileLock.unlock();
++ } // Paper end
+ }
+
+ public void scanChunk(ChunkPos chunkcoordintpair, StreamTagVisitor streamtagvisitor) throws IOException {
+@@ -130,7 +161,8 @@ public class RegionFileStorage implements AutoCloseable {
+ }
+
+ protected void write(ChunkPos pos, @Nullable CompoundTag nbt) throws IOException {
+- RegionFile regionfile = this.getRegionFile(pos, false); // CraftBukkit
++ RegionFile regionfile = this.getRegionFile(pos, false, true); // CraftBukkit // Paper
++ try { // Paper
+
+ if (nbt == null) {
+ regionfile.clear(pos);
+@@ -156,9 +188,12 @@ public class RegionFileStorage implements AutoCloseable {
+ }
+ }
+
++ } finally { // Paper start
++ regionfile.fileLock.unlock();
++ } // Paper end
+ }
+
+- public void close() throws IOException {
++ public synchronized void close() throws IOException { // Paper -> synchronized
+ ExceptionCollector<IOException> exceptionsuppressor = new ExceptionCollector<>();
+ ObjectIterator objectiterator = this.regionCache.values().iterator();
+
+@@ -175,7 +210,7 @@ public class RegionFileStorage implements AutoCloseable {
+ exceptionsuppressor.throwIfPresent();
+ }
+
+- public void flush() throws IOException {
++ public synchronized void flush() throws IOException { // Paper - synchronize
+ ObjectIterator objectiterator = this.regionCache.values().iterator();
+
+ while (objectiterator.hasNext()) {
+diff --git a/src/main/java/net/minecraft/world/level/chunk/storage/SectionStorage.java b/src/main/java/net/minecraft/world/level/chunk/storage/SectionStorage.java
+index 8a4750dd8f604062c4ea452f7b97b05a0c8d583a..678bd36581ead3a225e3a6e24b78e5db4e42657b 100644
+--- a/src/main/java/net/minecraft/world/level/chunk/storage/SectionStorage.java
++++ b/src/main/java/net/minecraft/world/level/chunk/storage/SectionStorage.java
+@@ -34,10 +34,10 @@ import net.minecraft.world.level.ChunkPos;
+ import net.minecraft.world.level.LevelHeightAccessor;
+ import org.slf4j.Logger;
+
+-public class SectionStorage<R> implements AutoCloseable {
++public class SectionStorage<R> extends RegionFileStorage implements AutoCloseable { // Paper - nuke IOWorker
+ private static final Logger LOGGER = LogUtils.getLogger();
+ private static final String SECTIONS_TAG = "Sections";
+- private final IOWorker worker;
++ // Paper - remove mojang I/O thread
+ private final Long2ObjectMap<Optional<R>> storage = new Long2ObjectOpenHashMap<>();
+ public final LongLinkedOpenHashSet dirty = new LongLinkedOpenHashSet();
+ private final Function<Runnable, Codec<R>> codec;
+@@ -48,13 +48,14 @@ public class SectionStorage<R> implements AutoCloseable {
+ protected final LevelHeightAccessor levelHeightAccessor;
+
+ public SectionStorage(Path path, Function<Runnable, Codec<R>> codecFactory, Function<Runnable, R> factory, DataFixer dataFixer, DataFixTypes dataFixTypes, boolean dsync, RegistryAccess dynamicRegistryManager, LevelHeightAccessor world) {
++ super(path, dsync); // Paper - remove mojang I/O thread
+ this.codec = codecFactory;
+ this.factory = factory;
+ this.fixerUpper = dataFixer;
+ this.type = dataFixTypes;
+ this.registryAccess = dynamicRegistryManager;
+ this.levelHeightAccessor = world;
+- this.worker = new IOWorker(path, dsync, path.getFileName().toString());
++ // Paper - remove mojang I/O thread
+ }
+
+ protected void tick(BooleanSupplier shouldKeepTicking) {
+@@ -122,15 +123,20 @@ public class SectionStorage<R> implements AutoCloseable {
+ }
+
+ private CompletableFuture<Optional<CompoundTag>> tryRead(ChunkPos pos) {
+- return this.worker.loadAsync(pos).exceptionally((throwable) -> {
+- if (throwable instanceof IOException iOException) {
+- LOGGER.error("Error reading chunk {} data from disk", pos, iOException);
+- return Optional.empty();
+- } else {
+- throw new CompletionException(throwable);
+- }
+- });
++ // Paper start - async chunk io
++ try {
++ return CompletableFuture.completedFuture(Optional.ofNullable(this.read(pos)));
++ } catch (Throwable thr) {
++ return CompletableFuture.failedFuture(thr);
++ }
++ // Paper end - async chunk io
++ }
++
++ // Paper start - async chunk io
++ public void loadInData(ChunkPos chunkPos, CompoundTag compound) {
++ this.readColumn(chunkPos, RegistryOps.create(NbtOps.INSTANCE, this.registryAccess), compound);
+ }
++ // Paper end - aync chnnk i
+
+ private <T> void readColumn(ChunkPos pos, DynamicOps<T> ops, @Nullable T data) {
+ if (data == null) {
+@@ -170,7 +176,7 @@ public class SectionStorage<R> implements AutoCloseable {
+ Dynamic<Tag> dynamic = this.writeColumn(pos, registryOps);
+ Tag tag = dynamic.getValue();
+ if (tag instanceof CompoundTag) {
+- this.worker.store(pos, (CompoundTag)tag);
++ try { this.write(pos, (CompoundTag)tag); } catch (IOException ioexception) { SectionStorage.LOGGER.error("Error writing data to disk", ioexception); } // Paper - nuke IOWorker
+ } else {
+ LOGGER.error("Expected compound tag, got {}", (Object)tag);
+ }
+@@ -198,6 +204,21 @@ public class SectionStorage<R> implements AutoCloseable {
+ return new Dynamic<>(ops, ops.createMap(ImmutableMap.of(ops.createString("Sections"), ops.createMap(map), ops.createString("DataVersion"), ops.createInt(SharedConstants.getCurrentVersion().getWorldVersion()))));
+ }
+
++ // Paper start - internal get data function, copied from above
++ private CompoundTag getDataInternal(ChunkPos pos) {
++ RegistryOps<Tag> registryOps = RegistryOps.create(NbtOps.INSTANCE, this.registryAccess);
++ Dynamic<Tag> dynamic = this.writeColumn(pos, registryOps);
++ Tag nbtbase = (Tag) dynamic.getValue();
++
++ if (nbtbase instanceof CompoundTag) {
++ return (CompoundTag)nbtbase;
++ } else {
++ SectionStorage.LOGGER.error("Expected compound tag, got {}", nbtbase);
++ }
++ return null;
++ }
++ // Paper end
++
+ private static long getKey(ChunkPos chunkPos, int y) {
+ return SectionPos.asLong(chunkPos.x, y, chunkPos.z);
+ }
+@@ -233,6 +254,23 @@ public class SectionStorage<R> implements AutoCloseable {
+
+ @Override
+ public void close() throws IOException {
+- this.worker.close();
++ //this.worker.close(); // Paper - nuke I/O worker
++ }
++
++ // Paper start - get data function
++ public CompoundTag getData(ChunkPos chunkcoordintpair) {
++ // Note: Copied from above
++ // This is checking if the data needs to be written, then it builds it later in getDataInternal(ChunkCoordIntPair)
++ if (!this.dirty.isEmpty()) {
++ for (int i = this.levelHeightAccessor.getMinSection(); i < this.levelHeightAccessor.getMaxSection(); ++i) {
++ long j = SectionPos.of(chunkcoordintpair, i).asLong();
++
++ if (this.dirty.contains(j)) {
++ return this.getDataInternal(chunkcoordintpair);
++ }
++ }
++ }
++ return null;
+ }
++ // Paper end
+ }
+diff --git a/src/main/java/org/spigotmc/WatchdogThread.java b/src/main/java/org/spigotmc/WatchdogThread.java
+index 335120afc88a8fc1543c2e6df516fd728e3ab032..581cde7a74e00bee1ce69086132d5f871d206399 100644
+--- a/src/main/java/org/spigotmc/WatchdogThread.java
++++ b/src/main/java/org/spigotmc/WatchdogThread.java
+@@ -83,6 +83,7 @@ public class WatchdogThread extends Thread
+ //
+ log.log( Level.SEVERE, "------------------------------" );
+ log.log( Level.SEVERE, "Server thread dump (Look for plugins here before reporting to Spigot!):" );
++ com.destroystokyo.paper.io.chunk.ChunkTaskManager.dumpAllChunkLoadInfo(); // Paper
+ WatchdogThread.dumpThread( ManagementFactory.getThreadMXBean().getThreadInfo( MinecraftServer.getServer().serverThread.getId(), Integer.MAX_VALUE ), log );
+ log.log( Level.SEVERE, "------------------------------" );
+ //