From 5a362b875938eb0764f6e8f5bf1352722aa15a98 Mon Sep 17 00:00:00 2001
From: Jason Penilla <11360596+jpenilla@users.noreply.github.com>
Date: Tue, 3 Dec 2024 20:04:41 -0700
Subject: Use ConcurrentUtil from Paper repo
---
patches/server/0007-ConcurrentUtil.patch | 10553 +------------------
...lConsoleAppender-for-console-improvements.patch | 8 +-
...plugin-prefixes-using-Log4J-configuration.patch | 4 +-
...pender-to-keep-logging-IO-off-main-thread.patch | 4 +-
...-stacktraces-in-log-messages-crash-report.patch | 4 +-
patches/server/0020-Plugin-remapping.patch | 6 +-
...flection-calls-in-plugins-using-internals.patch | 4 +-
.../0033-Expose-server-build-information.patch | 4 +-
patches/server/0348-Implement-Mob-Goal-API.patch | 4 +-
.../0691-Add-support-for-Proxy-Protocol.patch | 4 +-
...e-Velocity-compression-and-cipher-natives.patch | 4 +-
patches/server/1044-Bundle-spark.patch | 4 +-
12 files changed, 37 insertions(+), 10566 deletions(-)
diff --git a/patches/server/0007-ConcurrentUtil.patch b/patches/server/0007-ConcurrentUtil.patch
index b285b3c6e3..8e147a77db 100644
--- a/patches/server/0007-ConcurrentUtil.patch
+++ b/patches/server/0007-ConcurrentUtil.patch
@@ -4,10544 +4,15 @@ Date: Sun, 23 Jan 2022 22:58:11 -0800
Subject: [PATCH] ConcurrentUtil
-diff --git a/src/main/java/ca/spottedleaf/concurrentutil/collection/MultiThreadedQueue.java b/src/main/java/ca/spottedleaf/concurrentutil/collection/MultiThreadedQueue.java
-new file mode 100644
-index 0000000000000000000000000000000000000000..f84a622dc29750139ac280f480b7cd132b036287
---- /dev/null
-+++ b/src/main/java/ca/spottedleaf/concurrentutil/collection/MultiThreadedQueue.java
-@@ -0,0 +1,1421 @@
-+package ca.spottedleaf.concurrentutil.collection;
-+
-+import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
-+import ca.spottedleaf.concurrentutil.util.Validate;
-+
-+import java.lang.invoke.VarHandle;
-+import java.util.ArrayList;
-+import java.util.Collection;
-+import java.util.Iterator;
-+import java.util.List;
-+import java.util.NoSuchElementException;
-+import java.util.Queue;
-+import java.util.Spliterator;
-+import java.util.Spliterators;
-+import java.util.function.Consumer;
-+import java.util.function.IntFunction;
-+import java.util.function.Predicate;
-+
-+/**
-+ * MT-Safe linked first in first out ordered queue.
-+ *
-+ * This queue should out-perform {@link java.util.concurrent.ConcurrentLinkedQueue} in high-contention reads/writes, and is
-+ * not any slower in lower contention reads/writes.
-+ *
-+ * Note that this queue breaks the specification laid out by {@link Collection}, see {@link #preventAdds()} and {@link Collection#add(Object)}.
-+ *
-+ *
-+ * This queue will only unlink linked nodes through the {@link #peek()} and {@link #poll()} methods, and this is only if
-+ * they are at the head of the queue.
-+ *
-+ * @param Type of element in this queue.
-+ */
-+public class MultiThreadedQueue implements Queue {
-+
-+ protected volatile LinkedNode head; /* Always non-null, high chance of being the actual head */
-+
-+ protected volatile LinkedNode tail; /* Always non-null, high chance of being the actual tail */
-+
-+ /* Note that it is possible to reach head from tail. */
-+
-+ /* IMPL NOTE: Leave hashCode and equals to their defaults */
-+
-+ protected static final VarHandle HEAD_HANDLE = ConcurrentUtil.getVarHandle(MultiThreadedQueue.class, "head", LinkedNode.class);
-+ protected static final VarHandle TAIL_HANDLE = ConcurrentUtil.getVarHandle(MultiThreadedQueue.class, "tail", LinkedNode.class);
-+
-+ /* head */
-+
-+ protected final void setHeadPlain(final LinkedNode newHead) {
-+ HEAD_HANDLE.set(this, newHead);
-+ }
-+
-+ protected final void setHeadOpaque(final LinkedNode newHead) {
-+ HEAD_HANDLE.setOpaque(this, newHead);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getHeadPlain() {
-+ return (LinkedNode)HEAD_HANDLE.get(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getHeadOpaque() {
-+ return (LinkedNode)HEAD_HANDLE.getOpaque(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getHeadAcquire() {
-+ return (LinkedNode)HEAD_HANDLE.getAcquire(this);
-+ }
-+
-+ /* tail */
-+
-+ protected final void setTailPlain(final LinkedNode newTail) {
-+ TAIL_HANDLE.set(this, newTail);
-+ }
-+
-+ protected final void setTailOpaque(final LinkedNode newTail) {
-+ TAIL_HANDLE.setOpaque(this, newTail);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getTailPlain() {
-+ return (LinkedNode)TAIL_HANDLE.get(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getTailOpaque() {
-+ return (LinkedNode)TAIL_HANDLE.getOpaque(this);
-+ }
-+
-+ /**
-+ * Constructs a {@code MultiThreadedQueue}, initially empty.
-+ *
-+ * The returned object may not be published without synchronization.
-+ *
-+ */
-+ public MultiThreadedQueue() {
-+ final LinkedNode value = new LinkedNode<>(null, null);
-+ this.setHeadPlain(value);
-+ this.setTailPlain(value);
-+ }
-+
-+ /**
-+ * Constructs a {@code MultiThreadedQueue}, initially containing all elements in the specified {@code collection}.
-+ *
-+ * The returned object may not be published without synchronization.
-+ *
-+ * @param collection The specified collection.
-+ * @throws NullPointerException If {@code collection} is {@code null} or contains {@code null} elements.
-+ */
-+ public MultiThreadedQueue(final Iterable extends E> collection) {
-+ final Iterator extends E> elements = collection.iterator();
-+
-+ if (!elements.hasNext()) {
-+ final LinkedNode value = new LinkedNode<>(null, null);
-+ this.setHeadPlain(value);
-+ this.setTailPlain(value);
-+ return;
-+ }
-+
-+ final LinkedNode head = new LinkedNode<>(Validate.notNull(elements.next(), "Null element"), null);
-+ LinkedNode tail = head;
-+
-+ while (elements.hasNext()) {
-+ final LinkedNode next = new LinkedNode<>(Validate.notNull(elements.next(), "Null element"), null);
-+ tail.setNextPlain(next);
-+ tail = next;
-+ }
-+
-+ this.setHeadPlain(head);
-+ this.setTailPlain(tail);
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public E remove() throws NoSuchElementException {
-+ final E ret = this.poll();
-+
-+ if (ret == null) {
-+ throw new NoSuchElementException();
-+ }
-+
-+ return ret;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ *
-+ * Contrary to the specification of {@link Collection#add}, this method will fail to add the element to this queue
-+ * and return {@code false} if this queue is add-blocked.
-+ *
-+ */
-+ @Override
-+ public boolean add(final E element) {
-+ return this.offer(element);
-+ }
-+
-+ /**
-+ * Adds the specified element to the tail of this queue. If this queue is currently add-locked, then the queue is
-+ * released from that lock and this element is added. The unlock operation and addition of the specified
-+ * element is atomic.
-+ * @param element The specified element.
-+ * @return {@code true} if this queue previously allowed additions
-+ */
-+ public boolean forceAdd(final E element) {
-+ final LinkedNode node = new LinkedNode<>(element, null);
-+
-+ return !this.forceAppendList(node, node);
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public E element() throws NoSuchElementException {
-+ final E ret = this.peek();
-+
-+ if (ret == null) {
-+ throw new NoSuchElementException();
-+ }
-+
-+ return ret;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ *
-+ * This method may also return {@code false} to indicate an element was not added if this queue is add-blocked.
-+ *
-+ */
-+ @Override
-+ public boolean offer(final E element) {
-+ Validate.notNull(element, "Null element");
-+
-+ final LinkedNode node = new LinkedNode<>(element, null);
-+
-+ return this.appendList(node, node);
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public E peek() {
-+ for (LinkedNode head = this.getHeadOpaque(), curr = head;;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ if (this.getHeadOpaque() == head && curr != head) {
-+ this.setHeadOpaque(curr);
-+ }
-+ return element;
-+ }
-+
-+ if (next == null || curr == next) {
-+ return null;
-+ }
-+ curr = next;
-+ }
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public E poll() {
-+ return this.removeHead();
-+ }
-+
-+ /**
-+ * Retrieves and removes the head of this queue if it matches the specified predicate. If this queue is empty
-+ * or the head does not match the predicate, this function returns {@code null}.
-+ *
-+ * The predicate may be invoked multiple or no times in this call.
-+ *
-+ * @param predicate The specified predicate.
-+ * @return The head if it matches the predicate, or {@code null} if it did not or this queue is empty.
-+ */
-+ public E pollIf(final Predicate predicate) {
-+ return this.removeHead(Validate.notNull(predicate, "Null predicate"));
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public void clear() {
-+ //noinspection StatementWithEmptyBody
-+ while (this.poll() != null);
-+ }
-+
-+ /**
-+ * Prevents elements from being added to this queue. Once this is called, any attempt to add to this queue will fail.
-+ *
-+ * This function is MT-Safe.
-+ *
-+ * @return {@code true} if the queue was modified to prevent additions, {@code false} if it already prevented additions.
-+ */
-+ public boolean preventAdds() {
-+ final LinkedNode deadEnd = new LinkedNode<>(null, null);
-+ deadEnd.setNextPlain(deadEnd);
-+
-+ if (!this.appendList(deadEnd, deadEnd)) {
-+ return false;
-+ }
-+
-+ this.setTailPlain(deadEnd); /* (try to) Ensure tail is set for the following #allowAdds call */
-+ return true;
-+ }
-+
-+ /**
-+ * Allows elements to be added to this queue once again. Note that this function has undefined behaviour if
-+ * {@link #preventAdds()} is not called beforehand. The benefit of this function over {@link #tryAllowAdds()}
-+ * is that this function might perform better.
-+ *
-+ * This function is not MT-Safe.
-+ *
-+ */
-+ public void allowAdds() {
-+ LinkedNode tail = this.getTailPlain();
-+
-+ /* We need to find the tail given the cas on tail isn't atomic (nor volatile) in this.appendList */
-+ /* Thus it is possible for an outdated tail to be set */
-+ while (tail != (tail = tail.getNextPlain())) {}
-+
-+ tail.setNextVolatile(null);
-+ }
-+
-+ /**
-+ * Tries to allow elements to be added to this queue. Returns {@code true} if the queue was previous add-locked,
-+ * {@code false} otherwise.
-+ *
-+ * This function is MT-Safe, however it should not be used with {@link #allowAdds()}.
-+ *
-+ * @return {@code true} if the queue was previously add-locked, {@code false} otherwise.
-+ */
-+ public boolean tryAllowAdds() {
-+ LinkedNode tail = this.getTailPlain();
-+
-+ for (int failures = 0;;) {
-+ /* We need to find the tail given the cas on tail isn't atomic (nor volatile) in this.appendList */
-+ /* Thus it is possible for an outdated tail to be set */
-+ while (tail != (tail = tail.getNextAcquire())) {
-+ if (tail == null) {
-+ return false;
-+ }
-+ }
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (tail == (tail = tail.compareExchangeNextVolatile(tail, null))) {
-+ return true;
-+ }
-+
-+ if (tail == null) {
-+ return false;
-+ }
-+ ++failures;
-+ }
-+ }
-+
-+ /**
-+ * Atomically adds the specified element to this queue or allows additions to the queue. If additions
-+ * are not allowed, the element is not added.
-+ *
-+ * This function is MT-Safe.
-+ *
-+ * @param element The specified element.
-+ * @return {@code true} if the queue now allows additions, {@code false} if the element was added.
-+ */
-+ public boolean addOrAllowAdds(final E element) {
-+ Validate.notNull(element, "Null element");
-+ int failures = 0;
-+
-+ final LinkedNode append = new LinkedNode<>(element, null);
-+
-+ for (LinkedNode currTail = this.getTailOpaque(), curr = currTail;;) {
-+ /* It has been experimentally shown that placing the read before the backoff results in significantly greater performance */
-+ /* It is likely due to a cache miss caused by another write to the next field */
-+ final LinkedNode next = curr.getNextVolatile();
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (next == null) {
-+ final LinkedNode compared = curr.compareExchangeNextVolatile(null, append);
-+
-+ if (compared == null) {
-+ /* Added */
-+ /* Avoid CASing on tail more than we need to */
-+ /* CAS to avoid setting an out-of-date tail */
-+ if (this.getTailOpaque() == currTail) {
-+ this.setTailOpaque(append);
-+ }
-+ return false; // we added
-+ }
-+
-+ ++failures;
-+ curr = compared;
-+ continue;
-+ } else if (next == curr) {
-+ final LinkedNode compared = curr.compareExchangeNextVolatile(curr, null);
-+
-+ if (compared == curr) {
-+ return true; // we let additions through
-+ }
-+
-+ ++failures;
-+
-+ if (compared != null) {
-+ curr = compared;
-+ }
-+ continue;
-+ }
-+
-+ if (curr == currTail) {
-+ /* Tail is likely not up-to-date */
-+ curr = next;
-+ } else {
-+ /* Try to update to tail */
-+ if (currTail == (currTail = this.getTailOpaque())) {
-+ curr = next;
-+ } else {
-+ curr = currTail;
-+ }
-+ }
-+ }
-+ }
-+
-+ /**
-+ * Returns whether this queue is currently add-blocked. That is, whether {@link #add(Object)} and friends will return {@code false}.
-+ */
-+ public boolean isAddBlocked() {
-+ for (LinkedNode tail = this.getTailOpaque();;) {
-+ LinkedNode next = tail.getNextVolatile();
-+ if (next == null) {
-+ return false;
-+ }
-+
-+ if (next == tail) {
-+ return true;
-+ }
-+
-+ tail = next;
-+ }
-+ }
-+
-+ /**
-+ * Atomically removes the head from this queue if it exists, otherwise prevents additions to this queue if no
-+ * head is removed.
-+ *
-+ * This function is MT-Safe.
-+ *
-+ * If the queue is already add-blocked and empty then no operation is performed.
-+ * @return {@code null} if the queue is now add-blocked or was previously add-blocked, else returns
-+ * an non-null value which was the previous head of queue.
-+ */
-+ public E pollOrBlockAdds() {
-+ int failures = 0;
-+ for (LinkedNode head = this.getHeadOpaque(), curr = head;;) {
-+ final E currentVal = curr.getElementVolatile();
-+ final LinkedNode next = curr.getNextOpaque();
-+
-+ if (next == curr) {
-+ return null; /* Additions are already blocked */
-+ }
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (currentVal != null) {
-+ if (curr.getAndSetElementVolatile(null) == null) {
-+ ++failures;
-+ continue;
-+ }
-+
-+ /* "CAS" to avoid setting an out-of-date head */
-+ if (this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(next != null ? next : curr);
-+ }
-+
-+ return currentVal;
-+ }
-+
-+ if (next == null) {
-+ /* Try to update stale head */
-+ if (curr != head && this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(curr);
-+ }
-+
-+ final LinkedNode compared = curr.compareExchangeNextVolatile(null, curr);
-+
-+ if (compared != null) {
-+ // failed to block additions
-+ curr = compared;
-+ ++failures;
-+ continue;
-+ }
-+
-+ return null; /* We blocked additions */
-+ }
-+
-+ if (head == curr) {
-+ /* head is likely not up-to-date */
-+ curr = next;
-+ } else {
-+ /* Try to update to head */
-+ if (head == (head = this.getHeadOpaque())) {
-+ curr = next;
-+ } else {
-+ curr = head;
-+ }
-+ }
-+ }
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean remove(final Object object) {
-+ Validate.notNull(object, "Null object to remove");
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ if ((element == object || element.equals(object)) && curr.getAndSetElementVolatile(null) == element) {
-+ return true;
-+ }
-+ }
-+
-+ if (next == curr || next == null) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return false;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean removeIf(final Predicate super E> filter) {
-+ Validate.notNull(filter, "Null filter");
-+
-+ boolean ret = false;
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ ret |= filter.test(element) && curr.getAndSetElementVolatile(null) == element;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return ret;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean removeAll(final Collection> collection) {
-+ Validate.notNull(collection, "Null collection");
-+
-+ boolean ret = false;
-+
-+ /* Volatile is required to synchronize with the write to the first element */
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ ret |= collection.contains(element) && curr.getAndSetElementVolatile(null) == element;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return ret;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean retainAll(final Collection> collection) {
-+ Validate.notNull(collection, "Null collection");
-+
-+ boolean ret = false;
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ ret |= !collection.contains(element) && curr.getAndSetElementVolatile(null) == element;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return ret;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public Object[] toArray() {
-+ final List ret = new ArrayList<>();
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ ret.add(element);
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return ret.toArray();
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public T[] toArray(final T[] array) {
-+ final List ret = new ArrayList<>();
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ //noinspection unchecked
-+ ret.add((T)element);
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return ret.toArray(array);
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public T[] toArray(final IntFunction generator) {
-+ Validate.notNull(generator, "Null generator");
-+
-+ final List ret = new ArrayList<>();
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ //noinspection unchecked
-+ ret.add((T)element);
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return ret.toArray(generator);
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public String toString() {
-+ final StringBuilder builder = new StringBuilder();
-+
-+ builder.append("MultiThreadedQueue: {elements: {");
-+
-+ int deadEntries = 0;
-+ int totalEntries = 0;
-+ int aliveEntries = 0;
-+
-+ boolean addLocked = false;
-+
-+ for (LinkedNode curr = this.getHeadOpaque();; ++totalEntries) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element == null) {
-+ ++deadEntries;
-+ } else {
-+ ++aliveEntries;
-+ }
-+
-+ if (totalEntries != 0) {
-+ builder.append(", ");
-+ }
-+
-+ builder.append(totalEntries).append(": \"").append(element).append('"');
-+
-+ if (next == null) {
-+ break;
-+ }
-+ if (curr == next) {
-+ addLocked = true;
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ builder.append("}, total_entries: \"").append(totalEntries).append("\", alive_entries: \"").append(aliveEntries)
-+ .append("\", dead_entries:").append(deadEntries).append("\", add_locked: \"").append(addLocked)
-+ .append("\"}");
-+
-+ return builder.toString();
-+ }
-+
-+ /**
-+ * Adds all elements from the specified collection to this queue. The addition is atomic.
-+ * @param collection The specified collection.
-+ * @return {@code true} if all elements were added successfully, or {@code false} if this queue is add-blocked, or
-+ * {@code false} if the specified collection contains no elements.
-+ */
-+ @Override
-+ public boolean addAll(final Collection extends E> collection) {
-+ return this.addAll((Iterable extends E>)collection);
-+ }
-+
-+ /**
-+ * Adds all elements from the specified iterable object to this queue. The addition is atomic.
-+ * @param iterable The specified iterable object.
-+ * @return {@code true} if all elements were added successfully, or {@code false} if this queue is add-blocked, or
-+ * {@code false} if the specified iterable contains no elements.
-+ */
-+ public boolean addAll(final Iterable extends E> iterable) {
-+ Validate.notNull(iterable, "Null iterable");
-+
-+ final Iterator extends E> elements = iterable.iterator();
-+ if (!elements.hasNext()) {
-+ return false;
-+ }
-+
-+ /* Build a list of nodes to append */
-+ /* This is an much faster due to the fact that zero additional synchronization is performed */
-+
-+ final LinkedNode head = new LinkedNode<>(Validate.notNull(elements.next(), "Null element"), null);
-+ LinkedNode tail = head;
-+
-+ while (elements.hasNext()) {
-+ final LinkedNode next = new LinkedNode<>(Validate.notNull(elements.next(), "Null element"), null);
-+ tail.setNextPlain(next);
-+ tail = next;
-+ }
-+
-+ return this.appendList(head, tail);
-+ }
-+
-+ /**
-+ * Adds all of the elements from the specified array to this queue.
-+ * @param items The specified array.
-+ * @return {@code true} if all elements were added successfully, or {@code false} if this queue is add-blocked, or
-+ * {@code false} if the specified array has a length of 0.
-+ */
-+ public boolean addAll(final E[] items) {
-+ return this.addAll(items, 0, items.length);
-+ }
-+
-+ /**
-+ * Adds all of the elements from the specified array to this queue.
-+ * @param items The specified array.
-+ * @param off The offset in the array.
-+ * @param len The number of items.
-+ * @return {@code true} if all elements were added successfully, or {@code false} if this queue is add-blocked, or
-+ * {@code false} if the specified array has a length of 0.
-+ */
-+ public boolean addAll(final E[] items, final int off, final int len) {
-+ Validate.notNull(items, "Items may not be null");
-+ Validate.arrayBounds(off, len, items.length, "Items array indices out of bounds");
-+
-+ if (len == 0) {
-+ return false;
-+ }
-+
-+ final LinkedNode head = new LinkedNode<>(Validate.notNull(items[off], "Null element"), null);
-+ LinkedNode tail = head;
-+
-+ for (int i = 1; i < len; ++i) {
-+ final LinkedNode next = new LinkedNode<>(Validate.notNull(items[off + i], "Null element"), null);
-+ tail.setNextPlain(next);
-+ tail = next;
-+ }
-+
-+ return this.appendList(head, tail);
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean containsAll(final Collection> collection) {
-+ Validate.notNull(collection, "Null collection");
-+
-+ for (final Object element : collection) {
-+ if (!this.contains(element)) {
-+ return false;
-+ }
-+ }
-+ return false;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public Iterator iterator() {
-+ return new LinkedIterator<>(this.getHeadOpaque());
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ *
-+ * Note that this function is computed non-atomically and in O(n) time. The value returned may not be representative of
-+ * the queue in its current state.
-+ *
-+ */
-+ @Override
-+ public int size() {
-+ int size = 0;
-+
-+ /* Volatile is required to synchronize with the write to the first element */
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ ++size;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return size;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean isEmpty() {
-+ return this.peek() == null;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean contains(final Object object) {
-+ Validate.notNull(object, "Null object");
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null && (element == object || element.equals(object))) {
-+ return true;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return false;
-+ }
-+
-+ /**
-+ * Finds the first element in this queue that matches the predicate.
-+ * @param predicate The predicate to test elements against.
-+ * @return The first element that matched the predicate, {@code null} if none matched.
-+ */
-+ public E find(final Predicate predicate) {
-+ Validate.notNull(predicate, "Null predicate");
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null && predicate.test(element)) {
-+ return element;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+
-+ return null;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public void forEach(final Consumer super E> action) {
-+ Validate.notNull(action, "Null action");
-+
-+ for (LinkedNode curr = this.getHeadOpaque();;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E element = curr.getElementPlain(); /* Likely in sync */
-+
-+ if (element != null) {
-+ action.accept(element);
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+ }
-+
-+ // return true if normal addition, false if the queue previously disallowed additions
-+ protected final boolean forceAppendList(final LinkedNode head, final LinkedNode tail) {
-+ int failures = 0;
-+
-+ for (LinkedNode currTail = this.getTailOpaque(), curr = currTail;;) {
-+ /* It has been experimentally shown that placing the read before the backoff results in significantly greater performance */
-+ /* It is likely due to a cache miss caused by another write to the next field */
-+ final LinkedNode next = curr.getNextVolatile();
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (next == null || next == curr) {
-+ final LinkedNode compared = curr.compareExchangeNextVolatile(next, head);
-+
-+ if (compared == next) {
-+ /* Added */
-+ /* Avoid CASing on tail more than we need to */
-+ /* "CAS" to avoid setting an out-of-date tail */
-+ if (this.getTailOpaque() == currTail) {
-+ this.setTailOpaque(tail);
-+ }
-+ return next != curr;
-+ }
-+
-+ ++failures;
-+ curr = compared;
-+ continue;
-+ }
-+
-+ if (curr == currTail) {
-+ /* Tail is likely not up-to-date */
-+ curr = next;
-+ } else {
-+ /* Try to update to tail */
-+ if (currTail == (currTail = this.getTailOpaque())) {
-+ curr = next;
-+ } else {
-+ curr = currTail;
-+ }
-+ }
-+ }
-+ }
-+
-+ // return true if successful, false otherwise
-+ protected final boolean appendList(final LinkedNode head, final LinkedNode tail) {
-+ int failures = 0;
-+
-+ for (LinkedNode currTail = this.getTailOpaque(), curr = currTail;;) {
-+ /* It has been experimentally shown that placing the read before the backoff results in significantly greater performance */
-+ /* It is likely due to a cache miss caused by another write to the next field */
-+ final LinkedNode next = curr.getNextVolatile();
-+
-+ if (next == curr) {
-+ /* Additions are stopped */
-+ return false;
-+ }
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (next == null) {
-+ final LinkedNode compared = curr.compareExchangeNextVolatile(null, head);
-+
-+ if (compared == null) {
-+ /* Added */
-+ /* Avoid CASing on tail more than we need to */
-+ /* CAS to avoid setting an out-of-date tail */
-+ if (this.getTailOpaque() == currTail) {
-+ this.setTailOpaque(tail);
-+ }
-+ return true;
-+ }
-+
-+ ++failures;
-+ curr = compared;
-+ continue;
-+ }
-+
-+ if (curr == currTail) {
-+ /* Tail is likely not up-to-date */
-+ curr = next;
-+ } else {
-+ /* Try to update to tail */
-+ if (currTail == (currTail = this.getTailOpaque())) {
-+ curr = next;
-+ } else {
-+ curr = currTail;
-+ }
-+ }
-+ }
-+ }
-+
-+ protected final E removeHead(final Predicate predicate) {
-+ int failures = 0;
-+ for (LinkedNode head = this.getHeadOpaque(), curr = head;;) {
-+ // volatile here synchronizes-with writes to element
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E currentVal = curr.getElementPlain();
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (currentVal != null) {
-+ if (!predicate.test(currentVal)) {
-+ /* Try to update stale head */
-+ if (curr != head && this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(curr);
-+ }
-+ return null;
-+ }
-+ if (curr.getAndSetElementVolatile(null) == null) {
-+ /* Failed to get head */
-+ if (curr == (curr = next) || next == null) {
-+ return null;
-+ }
-+ ++failures;
-+ continue;
-+ }
-+
-+ /* "CAS" to avoid setting an out-of-date head */
-+ if (this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(next != null ? next : curr);
-+ }
-+
-+ return currentVal;
-+ }
-+
-+ if (curr == next || next == null) {
-+ /* Try to update stale head */
-+ if (curr != head && this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(curr);
-+ }
-+ return null; /* End of queue */
-+ }
-+
-+ if (head == curr) {
-+ /* head is likely not up-to-date */
-+ curr = next;
-+ } else {
-+ /* Try to update to head */
-+ if (head == (head = this.getHeadOpaque())) {
-+ curr = next;
-+ } else {
-+ curr = head;
-+ }
-+ }
-+ }
-+ }
-+
-+ protected final E removeHead() {
-+ int failures = 0;
-+ for (LinkedNode head = this.getHeadOpaque(), curr = head;;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+ final E currentVal = curr.getElementPlain();
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (currentVal != null) {
-+ if (curr.getAndSetElementVolatile(null) == null) {
-+ /* Failed to get head */
-+ if (curr == (curr = next) || next == null) {
-+ return null;
-+ }
-+ ++failures;
-+ continue;
-+ }
-+
-+ /* "CAS" to avoid setting an out-of-date head */
-+ if (this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(next != null ? next : curr);
-+ }
-+
-+ return currentVal;
-+ }
-+
-+ if (curr == next || next == null) {
-+ /* Try to update stale head */
-+ if (curr != head && this.getHeadOpaque() == head) {
-+ this.setHeadOpaque(curr);
-+ }
-+ return null; /* End of queue */
-+ }
-+
-+ if (head == curr) {
-+ /* head is likely not up-to-date */
-+ curr = next;
-+ } else {
-+ /* Try to update to head */
-+ if (head == (head = this.getHeadOpaque())) {
-+ curr = next;
-+ } else {
-+ curr = head;
-+ }
-+ }
-+ }
-+ }
-+
-+ /**
-+ * Empties the queue into the specified consumer. This function is optimized for single-threaded reads, and should
-+ * be faster than a loop on {@link #poll()}.
-+ *
-+ * This function is not MT-Safe. This function cannot be called with other read operations ({@link #peek()}, {@link #poll()},
-+ * {@link #clear()}, etc).
-+ * Write operations are safe to be called concurrently.
-+ *
-+ * @param consumer The consumer to accept the elements.
-+ * @return The total number of elements drained.
-+ */
-+ public int drain(final Consumer consumer) {
-+ return this.drain(consumer, false, ConcurrentUtil::rethrow);
-+ }
-+
-+ /**
-+ * Empties the queue into the specified consumer. This function is optimized for single-threaded reads, and should
-+ * be faster than a loop on {@link #poll()}.
-+ *
-+ * If {@code preventAdds} is {@code true}, then after this function returns the queue is guaranteed to be empty and
-+ * additions to the queue will fail.
-+ *
-+ *
-+ * This function is not MT-Safe. This function cannot be called with other read operations ({@link #peek()}, {@link #poll()},
-+ * {@link #clear()}, etc).
-+ * Write operations are safe to be called concurrently.
-+ *
-+ * @param consumer The consumer to accept the elements.
-+ * @param preventAdds Whether to prevent additions to this queue after draining.
-+ * @return The total number of elements drained.
-+ */
-+ public int drain(final Consumer consumer, final boolean preventAdds) {
-+ return this.drain(consumer, preventAdds, ConcurrentUtil::rethrow);
-+ }
-+
-+ /**
-+ * Empties the queue into the specified consumer. This function is optimized for single-threaded reads, and should
-+ * be faster than a loop on {@link #poll()}.
-+ *
-+ * If {@code preventAdds} is {@code true}, then after this function returns the queue is guaranteed to be empty and
-+ * additions to the queue will fail.
-+ *
-+ *
-+ * This function is not MT-Safe. This function cannot be called with other read operations ({@link #peek()}, {@link #poll()},
-+ * {@link #clear()}, {@link #remove(Object)} etc).
-+ * Only write operations are safe to be called concurrently.
-+ *
-+ * @param consumer The consumer to accept the elements.
-+ * @param preventAdds Whether to prevent additions to this queue after draining.
-+ * @param exceptionHandler Invoked when the consumer raises an exception.
-+ * @return The total number of elements drained.
-+ */
-+ public int drain(final Consumer consumer, final boolean preventAdds, final Consumer exceptionHandler) {
-+ Validate.notNull(consumer, "Null consumer");
-+ Validate.notNull(exceptionHandler, "Null exception handler");
-+
-+ /* This function assumes proper synchronization is made to ensure drain and no other read function are called concurrently */
-+ /* This allows plain write usages instead of opaque or higher */
-+ int total = 0;
-+
-+ final LinkedNode head = this.getHeadAcquire(); /* Required to synchronize with the write to the first element field */
-+ LinkedNode curr = head;
-+
-+ for (;;) {
-+ /* Volatile acquires with the write to the element field */
-+ final E currentVal = curr.getElementPlain();
-+ LinkedNode next = curr.getNextVolatile();
-+
-+ if (next == curr) {
-+ /* Add-locked nodes always have a null value */
-+ break;
-+ }
-+
-+ if (currentVal == null) {
-+ if (next == null) {
-+ if (preventAdds && (next = curr.compareExchangeNextVolatile(null, curr)) != null) {
-+ // failed to prevent adds, continue
-+ curr = next;
-+ continue;
-+ } else {
-+ // we're done here
-+ break;
-+ }
-+ }
-+ curr = next;
-+ continue;
-+ }
-+
-+ try {
-+ consumer.accept(currentVal);
-+ } catch (final Exception ex) {
-+ this.setHeadOpaque(next != null ? next : curr); /* Avoid perf penalty (of reiterating) if the exception handler decides to re-throw */
-+ curr.setElementOpaque(null); /* set here, we might re-throw */
-+
-+ exceptionHandler.accept(ex);
-+ }
-+
-+ curr.setElementOpaque(null);
-+
-+ ++total;
-+
-+ if (next == null) {
-+ if (preventAdds && (next = curr.compareExchangeNextVolatile(null, curr)) != null) {
-+ /* Retry with next value */
-+ curr = next;
-+ continue;
-+ }
-+ break;
-+ }
-+
-+ curr = next;
-+ }
-+ if (curr != head) {
-+ this.setHeadOpaque(curr); /* While this may be a plain write, eventually publish it for methods such as find. */
-+ }
-+ return total;
-+ }
-+
-+ @Override
-+ public Spliterator spliterator() { // TODO implement
-+ return Spliterators.spliterator(this, Spliterator.CONCURRENT |
-+ Spliterator.NONNULL | Spliterator.ORDERED);
-+ }
-+
-+ protected static final class LinkedNode {
-+
-+ protected volatile Object element;
-+ protected volatile LinkedNode next;
-+
-+ protected static final VarHandle ELEMENT_HANDLE = ConcurrentUtil.getVarHandle(LinkedNode.class, "element", Object.class);
-+ protected static final VarHandle NEXT_HANDLE = ConcurrentUtil.getVarHandle(LinkedNode.class, "next", LinkedNode.class);
-+
-+ protected LinkedNode(final Object element, final LinkedNode next) {
-+ ELEMENT_HANDLE.set(this, element);
-+ NEXT_HANDLE.set(this, next);
-+ }
-+
-+ /* element */
-+
-+ @SuppressWarnings("unchecked")
-+ protected final E getElementPlain() {
-+ return (E)ELEMENT_HANDLE.get(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final E getElementVolatile() {
-+ return (E)ELEMENT_HANDLE.getVolatile(this);
-+ }
-+
-+ protected final void setElementPlain(final E update) {
-+ ELEMENT_HANDLE.set(this, (Object)update);
-+ }
-+
-+ protected final void setElementOpaque(final E update) {
-+ ELEMENT_HANDLE.setOpaque(this, (Object)update);
-+ }
-+
-+ protected final void setElementVolatile(final E update) {
-+ ELEMENT_HANDLE.setVolatile(this, (Object)update);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final E getAndSetElementVolatile(final E update) {
-+ return (E)ELEMENT_HANDLE.getAndSet(this, update);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final E compareExchangeElementVolatile(final E expect, final E update) {
-+ return (E)ELEMENT_HANDLE.compareAndExchange(this, expect, update);
-+ }
-+
-+ /* next */
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getNextPlain() {
-+ return (LinkedNode)NEXT_HANDLE.get(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getNextOpaque() {
-+ return (LinkedNode)NEXT_HANDLE.getOpaque(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getNextAcquire() {
-+ return (LinkedNode)NEXT_HANDLE.getAcquire(this);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode getNextVolatile() {
-+ return (LinkedNode)NEXT_HANDLE.getVolatile(this);
-+ }
-+
-+ protected final void setNextPlain(final LinkedNode next) {
-+ NEXT_HANDLE.set(this, next);
-+ }
-+
-+ protected final void setNextVolatile(final LinkedNode next) {
-+ NEXT_HANDLE.setVolatile(this, next);
-+ }
-+
-+ @SuppressWarnings("unchecked")
-+ protected final LinkedNode compareExchangeNextVolatile(final LinkedNode expect, final LinkedNode set) {
-+ return (LinkedNode)NEXT_HANDLE.compareAndExchange(this, expect, set);
-+ }
-+ }
-+
-+ protected static final class LinkedIterator implements Iterator {
-+
-+ protected LinkedNode curr; /* last returned by next() */
-+ protected LinkedNode next; /* next to return from next() */
-+ protected E nextElement; /* cached to avoid a race condition with removing or polling */
-+
-+ protected LinkedIterator(final LinkedNode start) {
-+ /* setup nextElement and next */
-+ for (LinkedNode curr = start;;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+
-+ final E element = curr.getElementPlain();
-+
-+ if (element != null) {
-+ this.nextElement = element;
-+ this.next = curr;
-+ break;
-+ }
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+ curr = next;
-+ }
-+ }
-+
-+ protected final void findNext() {
-+ /* only called if this.nextElement != null, which means this.next != null */
-+ for (LinkedNode curr = this.next;;) {
-+ final LinkedNode next = curr.getNextVolatile();
-+
-+ if (next == null || next == curr) {
-+ break;
-+ }
-+
-+ final E element = next.getElementPlain();
-+
-+ if (element != null) {
-+ this.nextElement = element;
-+ this.curr = this.next; /* this.next will be the value returned from next(), set this.curr for remove() */
-+ this.next = next;
-+ return;
-+ }
-+ curr = next;
-+ }
-+
-+ /* out of nodes to iterate */
-+ /* keep curr for remove() calls */
-+ this.next = null;
-+ this.nextElement = null;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public boolean hasNext() {
-+ return this.nextElement != null;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public E next() {
-+ final E element = this.nextElement;
-+
-+ if (element == null) {
-+ throw new NoSuchElementException();
-+ }
-+
-+ this.findNext();
-+
-+ return element;
-+ }
-+
-+ /**
-+ * {@inheritDoc}
-+ */
-+ @Override
-+ public void remove() {
-+ if (this.curr == null) {
-+ throw new IllegalStateException();
-+ }
-+
-+ this.curr.setElementVolatile(null);
-+ this.curr = null;
-+ }
-+ }
-+}
-diff --git a/src/main/java/ca/spottedleaf/concurrentutil/completable/CallbackCompletable.java b/src/main/java/ca/spottedleaf/concurrentutil/completable/CallbackCompletable.java
-new file mode 100644
-index 0000000000000000000000000000000000000000..6bad6f8ecc0944d2f406924c7de7e227ff1e70fa
---- /dev/null
-+++ b/src/main/java/ca/spottedleaf/concurrentutil/completable/CallbackCompletable.java
-@@ -0,0 +1,110 @@
-+package ca.spottedleaf.concurrentutil.completable;
-+
-+import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
-+import ca.spottedleaf.concurrentutil.executor.Cancellable;
-+import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
-+import org.slf4j.Logger;
-+import org.slf4j.LoggerFactory;
-+import java.util.function.BiConsumer;
-+
-+public final class CallbackCompletable {
-+
-+ private static final Logger LOGGER = LoggerFactory.getLogger(CallbackCompletable.class);
-+
-+ private final MultiThreadedQueue> waiters = new MultiThreadedQueue<>();
-+ private T result;
-+ private Throwable throwable;
-+ private volatile boolean completed;
-+
-+ public boolean isCompleted() {
-+ return this.completed;
-+ }
-+
-+ /**
-+ * Note: Can only use after calling {@link #addAsynchronousWaiter(BiConsumer)}, as this function performs zero
-+ * synchronisation
-+ */
-+ public T getResult() {
-+ return this.result;
-+ }
-+
-+ /**
-+ * Note: Can only use after calling {@link #addAsynchronousWaiter(BiConsumer)}, as this function performs zero
-+ * synchronisation
-+ */
-+ public Throwable getThrowable() {
-+ return this.throwable;
-+ }
-+
-+ /**
-+ * Adds a waiter that should only be completed asynchronously by the complete() calls. If complete()
-+ * has already been called, returns {@code null} and does not invoke the specified consumer.
-+ * @param consumer Consumer to be executed on completion
-+ * @throws NullPointerException If consumer is null
-+ * @return A cancellable which will control the execution of the specified consumer
-+ */
-+ public Cancellable addAsynchronousWaiter(final BiConsumer consumer) {
-+ if (this.waiters.add(consumer)) {
-+ return new CancellableImpl(consumer);
-+ }
-+ return null;
-+ }
-+
-+ private void completeAllWaiters(final T result, final Throwable throwable) {
-+ this.completed = true;
-+ BiConsumer waiter;
-+ while ((waiter = this.waiters.pollOrBlockAdds()) != null) {
-+ this.completeWaiter(waiter, result, throwable);
-+ }
-+ }
-+
-+ private void completeWaiter(final BiConsumer consumer, final T result, final Throwable throwable) {
-+ try {
-+ consumer.accept(result, throwable);
-+ } catch (final Throwable throwable2) {
-+ LOGGER.error("Failed to complete callback " + ConcurrentUtil.genericToString(consumer), throwable2);
-+ }
-+ }
-+
-+ /**
-+ * Adds a waiter that will be completed asynchronously by the complete() calls. If complete()
-+ * has already been called, then invokes the consumer synchronously with the completed result.
-+ * @param consumer Consumer to be executed on completion
-+ * @throws NullPointerException If consumer is null
-+ * @return A cancellable which will control the execution of the specified consumer
-+ */
-+ public Cancellable addWaiter(final BiConsumer consumer) {
-+ if (this.waiters.add(consumer)) {
-+ return new CancellableImpl(consumer);
-+ }
-+ this.completeWaiter(consumer, this.result, this.throwable);
-+ return new CancellableImpl(consumer);
-+ }
-+
-+ public void complete(final T result) {
-+ this.result = result;
-+ this.completeAllWaiters(result, null);
-+ }
-+
-+ public void completeWithThrowable(final Throwable throwable) {
-+ if (throwable == null) {
-+ throw new NullPointerException("Throwable cannot be null");
-+ }
-+ this.throwable = throwable;
-+ this.completeAllWaiters(null, throwable);
-+ }
-+
-+ private final class CancellableImpl implements Cancellable {
-+
-+ private final BiConsumer waiter;
-+
-+ private CancellableImpl(final BiConsumer waiter) {
-+ this.waiter = waiter;
-+ }
-+
-+ @Override
-+ public boolean cancel() {
-+ return CallbackCompletable.this.waiters.remove(this.waiter);
-+ }
-+ }
-+}
-\ No newline at end of file
-diff --git a/src/main/java/ca/spottedleaf/concurrentutil/completable/Completable.java b/src/main/java/ca/spottedleaf/concurrentutil/completable/Completable.java
-new file mode 100644
-index 0000000000000000000000000000000000000000..365616439fa079017d648ed7f6ddf6950a691adf
---- /dev/null
-+++ b/src/main/java/ca/spottedleaf/concurrentutil/completable/Completable.java
-@@ -0,0 +1,737 @@
-+package ca.spottedleaf.concurrentutil.completable;
-+
-+import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
-+import ca.spottedleaf.concurrentutil.util.Validate;
-+import org.slf4j.Logger;
-+import org.slf4j.LoggerFactory;
-+import java.lang.invoke.VarHandle;
-+import java.util.concurrent.CompletableFuture;
-+import java.util.concurrent.CompletionException;
-+import java.util.concurrent.CompletionStage;
-+import java.util.concurrent.Executor;
-+import java.util.concurrent.ForkJoinPool;
-+import java.util.concurrent.locks.LockSupport;
-+import java.util.function.BiConsumer;
-+import java.util.function.BiFunction;
-+import java.util.function.Consumer;
-+import java.util.function.Function;
-+import java.util.function.Supplier;
-+
-+public final class Completable {
-+
-+ private static final Logger LOGGER = LoggerFactory.getLogger(Completable.class);
-+ private static final Function super Throwable, ? extends Throwable> DEFAULT_EXCEPTION_HANDLER = (final Throwable thr) -> {
-+ LOGGER.error("Unhandled exception during Completable operation", thr);
-+ return thr;
-+ };
-+
-+ public static Executor getDefaultExecutor() {
-+ return ForkJoinPool.commonPool();
-+ }
-+
-+ private static final Transform, ?> COMPLETED_STACK = new Transform<>(null, null, null, null) {
-+ @Override
-+ public void run() {}
-+ };
-+ private volatile Transform, T> completeStack;
-+ private static final VarHandle COMPLETE_STACK_HANDLE = ConcurrentUtil.getVarHandle(Completable.class, "completeStack", Transform.class);
-+
-+ private static final Object NULL_MASK = new Object();
-+ private volatile Object result;
-+ private static final VarHandle RESULT_HANDLE = ConcurrentUtil.getVarHandle(Completable.class, "result", Object.class);
-+
-+ private Object getResultPlain() {
-+ return (Object)RESULT_HANDLE.get(this);
-+ }
-+
-+ private Object getResultVolatile() {
-+ return (Object)RESULT_HANDLE.getVolatile(this);
-+ }
-+
-+ private void pushStackOrRun(final Transform, T> push) {
-+ int failures = 0;
-+ for (Transform, T> curr = (Transform, T>)COMPLETE_STACK_HANDLE.getVolatile(this);;) {
-+ if (curr == COMPLETED_STACK) {
-+ push.execute();
-+ return;
-+ }
-+
-+ push.next = curr;
-+
-+ for (int i = 0; i < failures; ++i) {
-+ ConcurrentUtil.backoff();
-+ }
-+
-+ if (curr == (curr = (Transform, T>)COMPLETE_STACK_HANDLE.compareAndExchange(this, curr, push))) {
-+ return;
-+ }
-+ push.next = null;
-+ ++failures;
-+ }
-+ }
-+
-+ private void propagateStack() {
-+ Transform, T> topStack = (Transform, T>)COMPLETE_STACK_HANDLE.getAndSet(this, COMPLETED_STACK);
-+ while (topStack != null) {
-+ topStack.execute();
-+ topStack = topStack.next;
-+ }
-+ }
-+
-+ private static Object maskNull(final Object res) {
-+ return res == null ? NULL_MASK : res;
-+ }
-+
-+ private static Object unmaskNull(final Object res) {
-+ return res == NULL_MASK ? null : res;
-+ }
-+
-+ private static Executor checkExecutor(final Executor executor) {
-+ return Validate.notNull(executor, "Executor may not be null");
-+ }
-+
-+ public Completable() {}
-+
-+ private Completable(final Object complete) {
-+ COMPLETE_STACK_HANDLE.set(this, COMPLETED_STACK);
-+ RESULT_HANDLE.setRelease(this, complete);
-+ }
-+
-+ public static Completable completed(final T value) {
-+ return new Completable<>(maskNull(value));
-+ }
-+
-+ public static Completable failed(final Throwable ex) {
-+ Validate.notNull(ex, "Exception may not be null");
-+
-+ return new Completable<>(new ExceptionResult(ex));
-+ }
-+
-+ public static Completable supplied(final Supplier supplier) {
-+ return supplied(supplier, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public static Completable supplied(final Supplier supplier, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ try {
-+ return completed(supplier.get());
-+ } catch (final Throwable throwable) {
-+ Throwable complete;
-+ try {
-+ complete = exceptionHandler.apply(throwable);
-+ } catch (final Throwable thr2) {
-+ throwable.addSuppressed(thr2);
-+ complete = throwable;
-+ }
-+ return failed(complete);
-+ }
-+ }
-+
-+ public static Completable suppliedAsync(final Supplier supplier, final Executor executor) {
-+ return suppliedAsync(supplier, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public static Completable suppliedAsync(final Supplier supplier, final Executor executor, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ final Completable ret = new Completable<>();
-+
-+ class AsyncSuppliedCompletable implements Runnable, CompletableFuture.AsynchronousCompletionTask {
-+ @Override
-+ public void run() {
-+ try {
-+ ret.complete(supplier.get());
-+ } catch (final Throwable throwable) {
-+ Throwable complete;
-+ try {
-+ complete = exceptionHandler.apply(throwable);
-+ } catch (final Throwable thr2) {
-+ throwable.addSuppressed(thr2);
-+ complete = throwable;
-+ }
-+ ret.completeExceptionally(complete);
-+ }
-+ }
-+ }
-+
-+ try {
-+ executor.execute(new AsyncSuppliedCompletable());
-+ } catch (final Throwable throwable) {
-+ Throwable complete;
-+ try {
-+ complete = exceptionHandler.apply(throwable);
-+ } catch (final Throwable thr2) {
-+ throwable.addSuppressed(thr2);
-+ complete = throwable;
-+ }
-+ ret.completeExceptionally(complete);
-+ }
-+
-+ return ret;
-+ }
-+
-+ private boolean completeRaw(final Object value) {
-+ if ((Object)RESULT_HANDLE.getVolatile(this) != null || !(boolean)RESULT_HANDLE.compareAndSet(this, (Object)null, value)) {
-+ return false;
-+ }
-+
-+ this.propagateStack();
-+ return true;
-+ }
-+
-+ public boolean complete(final T result) {
-+ return this.completeRaw(maskNull(result));
-+ }
-+
-+ public boolean completeExceptionally(final Throwable exception) {
-+ Validate.notNull(exception, "Exception may not be null");
-+
-+ return this.completeRaw(new ExceptionResult(exception));
-+ }
-+
-+ public boolean isDone() {
-+ return this.getResultVolatile() != null;
-+ }
-+
-+ public boolean isNormallyComplete() {
-+ return this.getResultVolatile() != null && !(this.getResultVolatile() instanceof ExceptionResult);
-+ }
-+
-+ public boolean isExceptionallyComplete() {
-+ return this.getResultVolatile() instanceof ExceptionResult;
-+ }
-+
-+ public Throwable getException() {
-+ final Object res = this.getResultVolatile();
-+ if (res == null) {
-+ return null;
-+ }
-+
-+ if (!(res instanceof ExceptionResult exRes)) {
-+ throw new IllegalStateException("Not completed exceptionally");
-+ }
-+
-+ return exRes.ex;
-+ }
-+
-+ public T getNow(final T dfl) throws CompletionException {
-+ final Object res = this.getResultVolatile();
-+ if (res == null) {
-+ return dfl;
-+ }
-+
-+ if (res instanceof ExceptionResult exRes) {
-+ throw new CompletionException(exRes.ex);
-+ }
-+
-+ return (T)unmaskNull(res);
-+ }
-+
-+ public T join() throws CompletionException {
-+ if (this.isDone()) {
-+ return this.getNow(null);
-+ }
-+
-+ final UnparkTransform unparkTransform = new UnparkTransform<>(this, Thread.currentThread());
-+
-+ this.pushStackOrRun(unparkTransform);
-+
-+ boolean interuptted = false;
-+ while (!unparkTransform.isReleasable()) {
-+ try {
-+ ForkJoinPool.managedBlock(unparkTransform);
-+ } catch (final InterruptedException ex) {
-+ interuptted = true;
-+ }
-+ }
-+
-+ if (interuptted) {
-+ Thread.currentThread().interrupt();
-+ }
-+
-+ return this.getNow(null);
-+ }
-+
-+ public CompletableFuture toFuture() {
-+ final Object rawResult = this.getResultVolatile();
-+ if (rawResult != null) {
-+ if (rawResult instanceof ExceptionResult exRes) {
-+ return CompletableFuture.failedFuture(exRes.ex);
-+ } else {
-+ return CompletableFuture.completedFuture((T)unmaskNull(rawResult));
-+ }
-+ }
-+
-+ final CompletableFuture ret = new CompletableFuture<>();
-+
-+ class ToFuture implements BiConsumer {
-+
-+ @Override
-+ public void accept(final T res, final Throwable ex) {
-+ if (ex != null) {
-+ ret.completeExceptionally(ex);
-+ } else {
-+ ret.complete(res);
-+ }
-+ }
-+ }
-+
-+ this.whenComplete(new ToFuture());
-+
-+ return ret;
-+ }
-+
-+ public static Completable fromFuture(final CompletionStage stage) {
-+ final Completable ret = new Completable<>();
-+
-+ class FromFuture implements BiConsumer {
-+ @Override
-+ public void accept(final T res, final Throwable ex) {
-+ if (ex != null) {
-+ ret.completeExceptionally(ex);
-+ } else {
-+ ret.complete(res);
-+ }
-+ }
-+ }
-+
-+ stage.whenComplete(new FromFuture());
-+
-+ return ret;
-+ }
-+
-+
-+ public Completable thenApply(final Function super T, ? extends U> function) {
-+ return this.thenApply(function, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenApply(final Function super T, ? extends U> function, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(function, "Function may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new ApplyTransform<>(null, this, ret, exceptionHandler, function));
-+ return ret;
-+ }
-+
-+ public Completable thenApplyAsync(final Function super T, ? extends U> function) {
-+ return this.thenApplyAsync(function, getDefaultExecutor(), DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenApplyAsync(final Function super T, ? extends U> function, final Executor executor) {
-+ return this.thenApplyAsync(function, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenApplyAsync(final Function super T, ? extends U> function, final Executor executor, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(function, "Function may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new ApplyTransform<>(checkExecutor(executor), this, ret, exceptionHandler, function));
-+ return ret;
-+ }
-+
-+
-+ public Completable thenAccept(final Consumer super T> consumer) {
-+ return this.thenAccept(consumer, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenAccept(final Consumer super T> consumer, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(consumer, "Consumer may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new AcceptTransform<>(null, this, ret, exceptionHandler, consumer));
-+ return ret;
-+ }
-+
-+ public Completable thenAcceptAsync(final Consumer super T> consumer) {
-+ return this.thenAcceptAsync(consumer, getDefaultExecutor(), DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenAcceptAsync(final Consumer super T> consumer, final Executor executor) {
-+ return this.thenAcceptAsync(consumer, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenAcceptAsync(final Consumer super T> consumer, final Executor executor, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(consumer, "Consumer may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new AcceptTransform<>(checkExecutor(executor), this, ret, exceptionHandler, consumer));
-+ return ret;
-+ }
-+
-+
-+ public Completable thenRun(final Runnable run) {
-+ return this.thenRun(run, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenRun(final Runnable run, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(run, "Run may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new RunTransform<>(null, this, ret, exceptionHandler, run));
-+ return ret;
-+ }
-+
-+ public Completable thenRunAsync(final Runnable run) {
-+ return this.thenRunAsync(run, getDefaultExecutor(), DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenRunAsync(final Runnable run, final Executor executor) {
-+ return this.thenRunAsync(run, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable thenRunAsync(final Runnable run, final Executor executor, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(run, "Run may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new RunTransform<>(checkExecutor(executor), this, ret, exceptionHandler, run));
-+ return ret;
-+ }
-+
-+
-+ public Completable handle(final BiFunction super T, ? super Throwable, ? extends U> function) {
-+ return this.handle(function, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable handle(final BiFunction super T, ? super Throwable, ? extends U> function,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(function, "Function may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new HandleTransform<>(null, this, ret, exceptionHandler, function));
-+ return ret;
-+ }
-+
-+ public Completable handleAsync(final BiFunction super T, ? super Throwable, ? extends U> function) {
-+ return this.handleAsync(function, getDefaultExecutor(), DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable handleAsync(final BiFunction super T, ? super Throwable, ? extends U> function,
-+ final Executor executor) {
-+ return this.handleAsync(function, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable handleAsync(final BiFunction super T, ? super Throwable, ? extends U> function,
-+ final Executor executor,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(function, "Function may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new HandleTransform<>(checkExecutor(executor), this, ret, exceptionHandler, function));
-+ return ret;
-+ }
-+
-+
-+ public Completable whenComplete(final BiConsumer super T, ? super Throwable> consumer) {
-+ return this.whenComplete(consumer, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable whenComplete(final BiConsumer super T, ? super Throwable> consumer, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(consumer, "Consumer may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new WhenTransform<>(null, this, ret, exceptionHandler, consumer));
-+ return ret;
-+ }
-+
-+ public Completable whenCompleteAsync(final BiConsumer super T, ? super Throwable> consumer) {
-+ return this.whenCompleteAsync(consumer, getDefaultExecutor(), DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable whenCompleteAsync(final BiConsumer super T, ? super Throwable> consumer, final Executor executor) {
-+ return this.whenCompleteAsync(consumer, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable whenCompleteAsync(final BiConsumer super T, ? super Throwable> consumer, final Executor executor,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(consumer, "Consumer may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new WhenTransform<>(checkExecutor(executor), this, ret, exceptionHandler, consumer));
-+ return ret;
-+ }
-+
-+
-+ public Completable exceptionally(final Function function) {
-+ return this.exceptionally(function, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable exceptionally(final Function function, final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(function, "Function may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new ExceptionallyTransform<>(null, this, ret, exceptionHandler, function));
-+ return ret;
-+ }
-+
-+ public Completable exceptionallyAsync(final Function function) {
-+ return this.exceptionallyAsync(function, getDefaultExecutor(), DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable exceptionallyAsync(final Function function, final Executor executor) {
-+ return this.exceptionallyAsync(function, executor, DEFAULT_EXCEPTION_HANDLER);
-+ }
-+
-+ public Completable exceptionallyAsync(final Function function, final Executor executor,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ Validate.notNull(function, "Function may not be null");
-+ Validate.notNull(exceptionHandler, "Exception handler may not be null");
-+
-+ final Completable ret = new Completable<>();
-+ this.pushStackOrRun(new ExceptionallyTransform<>(checkExecutor(executor), this, ret, exceptionHandler, function));
-+ return ret;
-+ }
-+
-+ private static final class ExceptionResult {
-+ public final Throwable ex;
-+
-+ public ExceptionResult(final Throwable ex) {
-+ this.ex = ex;
-+ }
-+ }
-+
-+ private static abstract class Transform implements Runnable, CompletableFuture.AsynchronousCompletionTask {
-+
-+ private Transform, T> next;
-+
-+ private final Executor executor;
-+ protected final Completable from;
-+ protected final Completable to;
-+ protected final Function super Throwable, ? extends Throwable> exceptionHandler;
-+
-+ protected Transform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler) {
-+ this.executor = executor;
-+ this.from = from;
-+ this.to = to;
-+ this.exceptionHandler = exceptionHandler;
-+ }
-+
-+ // force interface call to become virtual call
-+ @Override
-+ public abstract void run();
-+
-+ protected void failed(final Throwable throwable) {
-+ Throwable complete;
-+ try {
-+ complete = this.exceptionHandler.apply(throwable);
-+ } catch (final Throwable thr2) {
-+ throwable.addSuppressed(thr2);
-+ complete = throwable;
-+ }
-+ this.to.completeExceptionally(complete);
-+ }
-+
-+ public void execute() {
-+ if (this.executor == null) {
-+ this.run();
-+ return;
-+ }
-+
-+ try {
-+ this.executor.execute(this);
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class ApplyTransform extends Transform {
-+
-+ private final Function super T, ? extends U> function;
-+
-+ public ApplyTransform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler,
-+ final Function super T, ? extends U> function) {
-+ super(executor, from, to, exceptionHandler);
-+ this.function = function;
-+ }
-+
-+ @Override
-+ public void run() {
-+ final Object result = this.from.getResultPlain();
-+ try {
-+ if (result instanceof ExceptionResult exRes) {
-+ this.to.completeExceptionally(exRes.ex);
-+ } else {
-+ this.to.complete(this.function.apply((T)unmaskNull(result)));
-+ }
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class AcceptTransform extends Transform {
-+ private final Consumer super T> consumer;
-+
-+ public AcceptTransform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler,
-+ final Consumer super T> consumer) {
-+ super(executor, from, to, exceptionHandler);
-+ this.consumer = consumer;
-+ }
-+
-+ @Override
-+ public void run() {
-+ final Object result = this.from.getResultPlain();
-+ try {
-+ if (result instanceof ExceptionResult exRes) {
-+ this.to.completeExceptionally(exRes.ex);
-+ } else {
-+ this.consumer.accept((T)unmaskNull(result));
-+ this.to.complete(null);
-+ }
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class RunTransform extends Transform {
-+ private final Runnable run;
-+
-+ public RunTransform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler,
-+ final Runnable run) {
-+ super(executor, from, to, exceptionHandler);
-+ this.run = run;
-+ }
-+
-+ @Override
-+ public void run() {
-+ final Object result = this.from.getResultPlain();
-+ try {
-+ if (result instanceof ExceptionResult exRes) {
-+ this.to.completeExceptionally(exRes.ex);
-+ } else {
-+ this.run.run();
-+ this.to.complete(null);
-+ }
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class HandleTransform extends Transform {
-+
-+ private final BiFunction super T, ? super Throwable, ? extends U> function;
-+
-+ public HandleTransform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler,
-+ final BiFunction super T, ? super Throwable, ? extends U> function) {
-+ super(executor, from, to, exceptionHandler);
-+ this.function = function;
-+ }
-+
-+ @Override
-+ public void run() {
-+ final Object result = this.from.getResultPlain();
-+ try {
-+ if (result instanceof ExceptionResult exRes) {
-+ this.to.complete(this.function.apply(null, exRes.ex));
-+ } else {
-+ this.to.complete(this.function.apply((T)unmaskNull(result), null));
-+ }
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class WhenTransform extends Transform {
-+
-+ private final BiConsumer super T, ? super Throwable> consumer;
-+
-+ public WhenTransform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler,
-+ final BiConsumer super T, ? super Throwable> consumer) {
-+ super(executor, from, to, exceptionHandler);
-+ this.consumer = consumer;
-+ }
-+
-+ @Override
-+ public void run() {
-+ final Object result = this.from.getResultPlain();
-+ try {
-+ if (result instanceof ExceptionResult exRes) {
-+ this.consumer.accept(null, exRes.ex);
-+ this.to.completeExceptionally(exRes.ex);
-+ } else {
-+ final T unmasked = (T)unmaskNull(result);
-+ this.consumer.accept(unmasked, null);
-+ this.to.complete(unmasked);
-+ }
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class ExceptionallyTransform extends Transform {
-+ private final Function function;
-+
-+ public ExceptionallyTransform(final Executor executor, final Completable from, final Completable to,
-+ final Function super Throwable, ? extends Throwable> exceptionHandler,
-+ final Function function) {
-+ super(executor, from, to, exceptionHandler);
-+ this.function = function;
-+ }
-+
-+ @Override
-+ public void run() {
-+ final Object result = this.from.getResultPlain();
-+ try {
-+ if (result instanceof ExceptionResult exRes) {
-+ this.to.complete(this.function.apply(exRes.ex));
-+ } else {
-+ this.to.complete((T)unmaskNull(result));
-+ }
-+ } catch (final Throwable throwable) {
-+ this.failed(throwable);
-+ }
-+ }
-+ }
-+
-+ private static final class UnparkTransform extends Transform