From 01f0f5371b7c915a9a32d72eda788661b579f13a Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 2 Mar 2021 19:20:29 +0200 Subject: [PATCH] improves UnboundedProcessor Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../rsocket/internal/UnboundedProcessor.java | 264 +++++++++++------- 1 file changed, 156 insertions(+), 108 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 0df8f1d34..d84546944 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -23,11 +23,12 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import org.reactivestreams.Subscriber; +import java.util.stream.Stream; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Exceptions; import reactor.core.Fuseable; +import reactor.core.Scannable; import reactor.core.publisher.FluxProcessor; import reactor.core.publisher.Operators; import reactor.util.annotation.Nullable; @@ -45,20 +46,23 @@ public final class UnboundedProcessor extends FluxProcessor final Queue queue; final Queue priorityQueue; + boolean cancelled; boolean done; Throwable error; CoreSubscriber actual; - static final long STATE_TERMINATED = + static final long FLAG_TERMINATED = 0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; static final long FLAG_DISPOSED = 0b0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; static final long FLAG_CANCELLED = 0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; - static final long FLAG_SUBSCRIBED_ONCE = + static final long FLAG_SUBSCRIBER_READY = 0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long FLAG_SUBSCRIBED_ONCE = + 0b0000_1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; static final long MAX_WIP_VALUE = - 0b0000_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111L; + 0b0000_0111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111L; volatile long state; @@ -87,10 +91,21 @@ public int getBufferSize() { return Integer.MAX_VALUE; } + @Override + public Stream inners() { + return hasDownstreams() ? Stream.of(Scannable.from(this.actual)) : Stream.empty(); + } + @Override public Object scanUnsafe(Attr key) { + if (Attr.ACTUAL == key) return isSubscriberReady(this.state) ? this.actual : null; if (Attr.BUFFERED == key) return this.queue.size() + this.priorityQueue.size(); if (Attr.PREFETCH == key) return Integer.MAX_VALUE; + if (Attr.CANCELLED == key) { + final long state = this.state; + return isCancelled(state) || isDisposed(state); + } + return super.scanUnsafe(key); } @@ -99,6 +114,10 @@ public void onNextPrioritized(ByteBuf t) { release(t); return; } + if (this.cancelled) { + release(t); + return; + } if (!this.priorityQueue.offer(t)) { Throwable ex = @@ -117,6 +136,10 @@ public void onNext(ByteBuf t) { release(t); return; } + if (this.cancelled) { + release(t); + return; + } if (!this.queue.offer(t)) { Throwable ex = @@ -135,6 +158,9 @@ public void onError(Throwable t) { Operators.onErrorDropped(t, currentContext()); return; } + if (this.cancelled) { + return; + } this.error = t; this.done = true; @@ -153,19 +179,6 @@ public void onComplete() { drain(); } - @Override - public void subscribe(CoreSubscriber actual) { - Objects.requireNonNull(actual, "subscribe"); - if (markSubscribedOnce(this)) { - this.actual = actual; - actual.onSubscribe(this); - drain(); - } else { - Operators.error( - actual, new IllegalStateException("UnboundedProcessor allows only a single Subscriber")); - } - } - void drain() { long previousState = wipIncrement(this); if (isTerminated(previousState)) { @@ -179,20 +192,11 @@ void drain() { long expectedState = previousState + 1; for (; ; ) { - if (isSubscribedOnce(expectedState)) { + if (isSubscriberReady(expectedState)) { final boolean outputFused = this.outputFused; - final Subscriber a = this.actual; + final CoreSubscriber a = this.actual; if (outputFused) { - if (isCancelled(expectedState)) { - return; - } - - if (isDisposed(expectedState)) { - a.onError(new CancellationException("Disposed")); - return; - } - drainFused(expectedState, a); } else { if (isCancelled(expectedState)) { @@ -223,7 +227,7 @@ void drain() { } } - void drainRegular(long expectedState, Subscriber a) { + void drainRegular(long expectedState, CoreSubscriber a) { final Queue q = this.queue; final Queue pq = this.priorityQueue; @@ -238,13 +242,10 @@ void drainRegular(long expectedState, Subscriber a) { // Thread2: ------------------> <#onNext(V)> --> <#onComplete()> boolean done = this.done; - ByteBuf t; - boolean empty; + ByteBuf t = pq.poll(); + boolean empty = t == null; - if (!pq.isEmpty()) { - t = pq.poll(); - empty = false; - } else { + if (empty) { t = q.poll(); empty = t == null; } @@ -296,7 +297,7 @@ void drainRegular(long expectedState, Subscriber a) { } } - void drainFused(long expectedState, Subscriber a) { + void drainFused(long expectedState, CoreSubscriber a) { for (; ; ) { // done has to be read before queue.poll to ensure there was no racing: // Thread1: <#drain>: queue.poll(null) --------------------> this.done(true) @@ -330,7 +331,7 @@ void drainFused(long expectedState, Subscriber a) { } } - boolean checkTerminated(boolean done, boolean empty, Subscriber a) { + boolean checkTerminated(boolean done, boolean empty, CoreSubscriber a) { final long state = this.state; if (isCancelled(state)) { clearAndTerminate(this); @@ -344,13 +345,13 @@ boolean checkTerminated(boolean done, boolean empty, Subscriber } if (done && empty) { + clearAndTerminate(this); Throwable e = this.error; if (e != null) { a.onError(e); } else { a.onComplete(); } - clearAndTerminate(this); return true; } @@ -374,13 +375,31 @@ public int getPrefetch() { @Override public Context currentContext() { - final long state = this.state; - if (isSubscribedOnce(state) || isTerminated(state)) { - CoreSubscriber actual = this.actual; - return actual != null ? actual.currentContext() : Context.empty(); - } + return isSubscriberReady(this.state) ? this.actual.currentContext() : Context.empty(); + } - return Context.empty(); + @Override + public void subscribe(CoreSubscriber actual) { + Objects.requireNonNull(actual, "subscribe"); + if (markSubscribedOnce(this)) { + actual.onSubscribe(this); + this.actual = actual; + long previousState = markSubscriberReady(this); + if (isCancelled(previousState)) { + return; + } + if (isDisposed(previousState)) { + actual.onError(new CancellationException("Disposed")); + return; + } + if (isWorkInProgress(previousState)) { + return; + } + drain(); + } else { + Operators.error( + actual, new IllegalStateException("UnboundedProcessor allows only a single Subscriber")); + } } @Override @@ -393,17 +412,25 @@ public void request(long n) { @Override public void cancel() { - if (!markCancelled(this)) { + this.cancelled = true; + + final long previousState = markCancelled(this); + if (isTerminated(previousState) + || isCancelled(previousState) + || isDisposed(previousState) + || isWorkInProgress(previousState)) { return; } - if (!this.outputFused) { + if (!isSubscriberReady(previousState) || !this.outputFused) { clearAndTerminate(this); } } @Override public void dispose() { + this.cancelled = true; + final long previousState = markDisposed(this); if (isTerminated(previousState) || isCancelled(previousState) @@ -412,25 +439,37 @@ public void dispose() { return; } - if (!this.outputFused) { + if (!isSubscriberReady(previousState)) { clearAndTerminate(this); + return; } - if (isSubscribedOnce(previousState)) { - this.actual.onError(new CancellationException("Disposed")); + if (!this.outputFused) { + clearAndTerminate(this); } + this.actual.onError(new CancellationException("Disposed")); } @Override @Nullable public ByteBuf poll() { - Queue pq = this.priorityQueue; - if (!pq.isEmpty()) { - return pq.poll(); + ByteBuf t = this.priorityQueue.poll(); + if (t != null) { + return t; } return this.queue.poll(); } + @Override + public int size() { + return this.priorityQueue.size() + this.queue.size(); + } + + @Override + public boolean isEmpty() { + return this.priorityQueue.isEmpty() && this.queue.isEmpty(); + } + /** * Clears all elements from queues and set state to terminate. This method MUST be called only by * the downstream subscriber which has enabled {@link Fuseable#ASYNC} fusion with the given {@link @@ -473,16 +512,6 @@ void clearUnsafely() { } } - @Override - public int size() { - return this.priorityQueue.size() + this.queue.size(); - } - - @Override - public boolean isEmpty() { - return this.priorityQueue.isEmpty() && this.queue.isEmpty(); - } - @Override public int requestFusion(int requestedMode) { if ((requestedMode & Fuseable.ASYNC) != 0) { @@ -500,15 +529,17 @@ public boolean isDisposed() { @Override public boolean isTerminated() { + //noinspection unused final long state = this.state; - return isTerminated(state) || this.done; + return this.done; } @Override @Nullable public Throwable getError() { + //noinspection unused final long state = this.state; - if (isTerminated(state) || this.done) { + if (this.done) { return this.error; } else { return null; @@ -522,7 +553,8 @@ public long downstreamCount() { @Override public boolean hasDownstreams() { - return (this.state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE && this.actual != null; + final long state = this.state; + return !isTerminated(state) && isSubscriberReady(state); } static void release(ByteBuf byteBuf) { @@ -536,8 +568,8 @@ static void release(ByteBuf byteBuf) { } /** - * Tries to set {@link #FLAG_SUBSCRIBED_ONCE} flag if it was not set before and if state is not - * {@link #STATE_TERMINATED} and flags {@link #FLAG_CANCELLED} or {@link #FLAG_DISPOSED} are unset + * Sets {@link #FLAG_SUBSCRIBED_ONCE} flag if it was not set before and if flags {@link + * #FLAG_TERMINATED}, {@link #FLAG_CANCELLED} or {@link #FLAG_DISPOSED} are unset * * @return {@code true} if {@link #FLAG_SUBSCRIBED_ONCE} was successfully set */ @@ -545,11 +577,8 @@ static boolean markSubscribedOnce(UnboundedProcessor instance) { for (; ; ) { long state = instance.state; - if (state == STATE_TERMINATED) { - return false; - } - - if ((state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED + || (state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE || (state & FLAG_CANCELLED) == FLAG_CANCELLED || (state & FLAG_DISPOSED) == FLAG_DISPOSED) { return false; @@ -562,21 +591,40 @@ static boolean markSubscribedOnce(UnboundedProcessor instance) { } /** - * Tries to set {@link #FLAG_CANCELLED} flag if it was not set before and if state is not {@link - * #STATE_TERMINATED}. Also, this method increments number of work in progress (WIP) + * Sets {@link #FLAG_SUBSCRIBER_READY} flag if flags {@link #FLAG_TERMINATED}, {@link + * #FLAG_CANCELLED} or {@link #FLAG_DISPOSED} are unset * - * @return {@code true} if {@link #FLAG_CANCELLED} was successfully set + * @return previous state */ - static boolean markCancelled(UnboundedProcessor instance) { + static long markSubscriberReady(UnboundedProcessor instance) { for (; ; ) { long state = instance.state; - if (state == STATE_TERMINATED) { - return false; + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED + || (state & FLAG_CANCELLED) == FLAG_CANCELLED + || (state & FLAG_DISPOSED) == FLAG_DISPOSED) { + return state; } - if ((state & FLAG_CANCELLED) == FLAG_CANCELLED) { - return false; + if (STATE.compareAndSet(instance, state, state | FLAG_SUBSCRIBER_READY)) { + return state; + } + } + } + + /** + * Sets {@link #FLAG_CANCELLED} flag if it was not set before and if flag {@link #FLAG_TERMINATED} + * is unset. Also, this method increments number of work in progress (WIP) + * + * @return previous state + */ + static long markCancelled(UnboundedProcessor instance) { + for (; ; ) { + long state = instance.state; + + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED + || (state & FLAG_CANCELLED) == FLAG_CANCELLED) { + return state; } long nextState = state + 1; @@ -585,15 +633,15 @@ static boolean markCancelled(UnboundedProcessor instance) { } if (STATE.compareAndSet(instance, state, nextState | FLAG_CANCELLED)) { - return !isWorkInProgress(state); + return state; } } } /** - * Tries to set {@link #FLAG_DISPOSED} flag if it was not set before and if state is not {@link - * #STATE_TERMINATED} and flags {@link #FLAG_CANCELLED} are unset. Also, this method increments - * number of work in progress (WIP) + * Sets {@link #FLAG_DISPOSED} flag if it was not set before and if flags {@link + * #FLAG_TERMINATED}, {@link #FLAG_CANCELLED} are unset. Also, this method increments number of + * work in progress (WIP) * * @return previous state */ @@ -601,11 +649,9 @@ static long markDisposed(UnboundedProcessor instance) { for (; ; ) { long state = instance.state; - if (state == STATE_TERMINATED) { - return STATE_TERMINATED; - } - - if ((state & FLAG_CANCELLED) == FLAG_CANCELLED || (state & FLAG_DISPOSED) == FLAG_DISPOSED) { + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED + || (state & FLAG_CANCELLED) == FLAG_CANCELLED + || (state & FLAG_DISPOSED) == FLAG_DISPOSED) { return state; } @@ -621,8 +667,8 @@ static long markDisposed(UnboundedProcessor instance) { } /** - * Tries to increment the amount of work in progress (max value is {@link #MAX_WIP_VALUE} on the - * given state. Fails if state is {@link #STATE_TERMINATED}. + * Increments the amount of work in progress (max value is {@link #MAX_WIP_VALUE} on the given + * state. Fails if flag {@link #FLAG_TERMINATED} is set. * * @return previous state */ @@ -630,8 +676,8 @@ static long wipIncrement(UnboundedProcessor instance) { for (; ; ) { long state = instance.state; - if (state == STATE_TERMINATED) { - return STATE_TERMINATED; + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED) { + return state; } final long nextState = state + 1; @@ -646,23 +692,26 @@ static long wipIncrement(UnboundedProcessor instance) { } /** - * Tries to decrement the amount of work in progress by the given amount on the given state. Fails - * if state is {@link #STATE_TERMINATED} or it has flags {@link #FLAG_CANCELLED} or {@link - * #FLAG_DISPOSED} set. + * Decrements the amount of work in progress by the given amount on the given state. Fails if flag + * is {@link #FLAG_TERMINATED} is set or if fusion disabled and flags {@link #FLAG_CANCELLED} or + * {@link #FLAG_DISPOSED} are set. + * + *

Note, if fusion is enabled, the decrement should work if flags {@link #FLAG_CANCELLED} or + * {@link #FLAG_DISPOSED} are set, since, while the operator was not terminate by the downstream, + * we still have to propagate notifications that new elements are enqueued * * @return state after changing WIP or current state if update failed */ static long wipRemoveMissing(UnboundedProcessor instance, long previousState) { long missed = previousState & MAX_WIP_VALUE; - boolean outputFused = instance.outputFused; for (; ; ) { long state = instance.state; - if (state == STATE_TERMINATED) { - return STATE_TERMINATED; + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED) { + return state; } - if (!outputFused + if (((state & FLAG_SUBSCRIBER_READY) != FLAG_SUBSCRIBER_READY || !instance.outputFused) && ((state & FLAG_CANCELLED) == FLAG_CANCELLED || (state & FLAG_DISPOSED) == FLAG_DISPOSED)) { return state; @@ -676,7 +725,7 @@ static long wipRemoveMissing(UnboundedProcessor instance, long previousState) { } /** - * Set state {@link #STATE_TERMINATED} and {@link #release(ByteBuf)} all the elements from {@link + * Set flag {@link #FLAG_TERMINATED} and {@link #release(ByteBuf)} all the elements from {@link * #queue} and {@link #priorityQueue}. * *

This method may be called concurrently only if the given {@link UnboundedProcessor} has no @@ -684,21 +733,20 @@ static long wipRemoveMissing(UnboundedProcessor instance, long previousState) { * and only by the downstream calling method {@link #clear()} */ static void clearAndTerminate(UnboundedProcessor instance) { - final boolean outputFused = instance.outputFused; for (; ; ) { long state = instance.state; - if (outputFused) { + if (!isSubscriberReady(state) || !instance.outputFused) { instance.clearSafely(); } else { instance.clearUnsafely(); } - if (state == STATE_TERMINATED) { + if ((state & FLAG_TERMINATED) == FLAG_TERMINATED) { return; } - if (STATE.compareAndSet(instance, state, STATE_TERMINATED)) { + if (STATE.compareAndSet(instance, state, (state & ~MAX_WIP_VALUE) | FLAG_TERMINATED)) { break; } } @@ -717,10 +765,10 @@ static boolean isWorkInProgress(long state) { } static boolean isTerminated(long state) { - return state == STATE_TERMINATED; + return (state & FLAG_TERMINATED) == FLAG_TERMINATED; } - static boolean isSubscribedOnce(long state) { - return (state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE; + static boolean isSubscriberReady(long state) { + return (state & FLAG_SUBSCRIBER_READY) == FLAG_SUBSCRIBER_READY; } }