A következő címkéjű bejegyzések mutatása: Publisher. Összes bejegyzés megjelenítése
A következő címkéjű bejegyzések mutatása: Publisher. Összes bejegyzés megjelenítése

2017. szeptember 30., szombat

Java 9 Flow API: taking and skipping

Introduction


Limiting or skipping over parts of a flow is a very common task: either we are only interested in the first N items or we don't care about the first N items. Sometimes, N is unknown but we can decide, based on the current item, when to stop relaying items or, in contrast, when to start relaying items.


Take(N)


In concept, limiting a flow to a certain size should be straightforward: count the number of items received via onNext and when the limit is reached, issue a cancel() towards the upstream and onComplete() towards the downstream.


public static <T> Flow.Publisher<T> take(Flow.Publisher<T> source, long n) {
    return new TakePublisher<>(source, n);
}


The operator's implementation requires little state:

static final class TakeSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
}


In its simplest form, there is no need for intercepting the request() and cancel() calls from the downstream: these can be passthrought, however, since the operator has to stop the sequence upon reaching the limit (remaining == 0), the upstream's Flow.Subscriber has to be stored.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        this.upstream = s;
        downstream.onSubscribe(s);
    }


In onSubscribe, we only have to store the Flow.Subscription and forward it to the downstream.

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r > 0L) {
            remaining = --r;
            downstream.onNext(item);
 
            if (r == 0) {
                upstream.cancel();
                downstream.onComplete();
            }
        }
    }


While remaining is positive, we decrement it and save it into its field followed by an emission of the current item. If the remaining count reached zero, the upstream is cancelled and the downstream is completed. Any further items will be ignored (in case cancel() doesn't immediately stop the upstream).

    @Override
    public void onError(Throwable throwable) {
        if (remaining != 0L) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (remaining != 0L) {
            downstream.onComplete();
        }
    }


The onError and onComplete check the remaining count and if it's positive, the respective terminal event is reached. The reason for this is that if the operator runs out of items before its limit, the terminal event is relayed as well. However, if the operator happens to run with a limit equal to the actual length of the flow, the last item in onNext will trigger an onComplete which could be followed by a terminal event from the upstream that has to be suppressed. This behavior is allowed by the Reactive-Streams specification (i.e., cancel() may not have immediate effect) and we have dealt with it via a done field in other operators. Here, the fact remaining == 0 is the done indicator.

Being a pass-through for backpressure (the downstream calls request() directly on the upstream's Flow.Subscription), the upstream is not limited by this operator and may attempt to produce more items than the limit. In other times, knowing the downstream requested more than the limit, the upstream can actually be consumed in an unbounded fashion, utilizing fast-paths may result in improved performance.

Deciding which of the three behaviors should be default is up to the operator writer, but it is interesting to look at the remaining two modes: limiting the request amount and unbounding it.

Limiting the request amount


In this mode, if the downstream requests more or equal to the limit, we can issue a single request with the limit amount. However, if the downstream requests less, we have to perform some intricate request accounting.

We'll need a new field, requestRemaining with a VarHandle REQUEST_REMAINING companion. We also have to intercept request() from downstream and figure out atomically what number to request from the upstream so the total requested amount doesn't go above the limit.


static final class TakeSubscriber<T> implements 
Flow.Subscriber<T>, Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    long requestRemaining;
    static final VarHandle REQUEST_REMAINING =
        VH.find(MethodHandles.lookup(), TakeSubscriber.class,
            "requestRemaining", long.class);

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
        this.requestRemaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(this);
    }

    // onNext, onError and onComplete are the same

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        upstream.cancel();
    }
}


The onSubscribe now forwards this as the Flow.Subscription and cancel() just delegates to the upstream.cancel().

    @Override
    public void request(long n) {
        for (;;) {
            long r = (long)REQUEST_REMAINING.getAquire(this);
            long newRequest;
            if (r <= n) {
                newRequest = r;
            } else {
                newRequest = n;
            }
            long u = r - newRequest;
            if (REQUEST_REMAINING.compareAndSet(this, r, u)) {
                upstream.request(newRequest);
                break;
            }
        }
    }


First we read the remaining request amount. If it is smaller or equal to the downstream's request, we'll request the remaining amount from the upstream. If the downstream requested less than the remaining amount, we'll request that amount instead. The new remaining amount is then the current minus the new request amount decided. After the successful CAS, we request the new amount from the upstream and quit the loop.


Unbounding the request amount


The other direction, namely, unbounding the upstream if the downstream requested at least the limit of the operator, can be achieved through the same requestRemaining field but a different request() logic:


    @Override
    public void request(long n) {
        long r = (long)REQUEST_REMAINING.getAcquire(this);
        if (r != 0L) {
             r = (long)REQUEST_REMAINING.getAndSet(this, 0L);
             if (r != 0L && r <= n) {
                 upstream.request(Long.MAX_VALUE);
                 return;
             }
        }
        upstream.request(n);
    }


If the remaining request amount is non-zero, we atomically replace it with zero. This happens exactly once, for the first time request() is invoked by the downstream, and we'll check if the remaining amount (same as the limit) is less or requal to the downstream's request amount. If so, we request from the upstream an unbounded amount (Long.MAX_VALUE). If it was not the first request or the amount was less than the limit, we revert back to the pass-through behavior.


Take with predicate


Sometimes, we'd like to stop the flow when an item matches a certain condition. This can happen before the item is emitted (i.e., takeWhile) or after (i.e., takeUntil). When working with a predicate, we no longer know how many items we'll let pass thus manipulating the request amount is not really an option here.


public static <T> Flow.Publisher<T> takeWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new TakeWhilePublisher<>(source, predicate);
}

// Flow.Publisher boilerplate omitted

static final class TakeWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean done;

    TakeWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe simply forwards the Flow.Subscription and the terminal onError/onComplete methods forward their respective event if the done flag is false. The flag will be set in onNext if the flow has been stopped due to the condition and is there to prevent the emission of multiple terminal events in case the flow would have ended anyway.

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        boolean b;

        try {
            b = predicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            downstream.onNext(item);
        } else {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


The first step is to protect against sources that would produce a few items after a cancel() call. Next, we don't trust the user-provided Predicate, which when crashes, will have to cancel the upstream, lock out further events and signal the Throwable to the downstream. A true result from the predicate will allow us to emit the item to the downstream. A false will stop the source, lock out further upstream events and complete the downstream.

If we'd still want to receive the item before the predicate indicates the flow should stop, aka the takeUntil operator, only the onNext logic should be changed a bit:

public static <T> Flow.Publisher<T> takeUntil(
        Flow.Publisher<T> source, Predicate<? super T> stopPredicate) {

    return new TakeUntilPublisher<>(source, stopPredicate);
}

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        downstream.onNext(item);

        boolean b;

        try {
            b = stopPredicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


Here, the stopPredicate of the API entry point should indicate when to stop by returning true, whereas the previous takeWhile operator indicated a stop via false. It is a matter of taste I guess. The RxJava convetion is that takeWhile(item -> item < 5) will take items while each of them is less than five, never emitting five itself whereas takeUntil(item -> item == 5) will stop after emitting five.


Skip(N)


The dual of take(N), in some sense, is the operator which skips the first N items then lets the rest through. Again, counting the items is a crucial part of the operator's implementation.


public static <T> Flow.Publisher<T> skip(
        Flow.Publisher<T> source, long n) {

    return new SkipPublisher<>(source, n);
}


The operator's Flow.Subscriber uses similar counting method as take(), decrementing a remaining field and once it reaches zero, all subsequent events are forwarded.

static final class SkipSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    SkipSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
}


The onError/onComplete methods can now emit the respective event directly without the need to check the remaining count. May the sequence be shorter, equal or longer than the provided limit, the terminal events can be emitted as onNext is there to skip items, not stop the flow.

In general, when an operator drops items, it has the responsibility to ask for more from the upstream as the downstream has no way of knowing it didn't receive an item (other than timing out). Thus, it doesn't know it has to request more and has to be requested on behalf by the operator itself. However, requesting one by one is expensive - 1 CAS + 0..2 atomic increments per invocation - which can be avoided by requesting in batches.

The skip() operator is in a particular position where we know the first N items will be dropped, thus we can simply request N on top of what the downstream requested. onNext will drop the first N and even if the downstream hasn't requested, it won't get overflown.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;

        long n = remaining;
        downstream.onSubscribe(s);
        s.request(n);
    }


The reason remaining is read before sending the Flow.Subscription to the downstream is that this call may result in value emission which decrements remaining and we'd end up with less than the amount to be skipped. This also saves a field storing the logically immutable skip amount.

The role of onNext is to drop items and then let the rest pass through:

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r == 0L) {
            downstream.onNext(item);
        } else {
            remaining = r - 1;
        }
    }


Skipping while a condition holds


Similar to a conditional take, we can skip unwanted items that match a condition (predicate) and the let the rest through unconditionally. Unfortunately, this case requires per item replenishment from the upstream if the condition doesn't hold as we can't be sure which upstream item will yield a true value and switch the operator into a pass-through mode.


public static <T> Flow.Publisher<T> skipWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new SkipWhilePublisher<>(source, predicate);
}

static final class SkipWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean passthrough;

    boolean done;

    SkipWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe becomes a direct pass-through for the Flow.Subscription and the onError/onComplete methods get their done checks back. This is required because the predicate may fail for the very last item and the cancellation may not stop the terminal event. There is also the need for a flag that tells the onNext to let all items through and skip the predicate altogether.




    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        if (!passthrough) {
            boolean b;
            try {
                b = predicate.test(item);
            } catch (Throwable ex) {
                upstream.cancel();
                done = true;
                downstream.onError(ex);
                return;
            }
            if (b) {
                upstream.request(1);
                return;
            }
            passthrough = true;           
        }

        downstream.onNext(item);
    }


First we prevent an excess onNext if there was a crash in the predicate before. Next, if the operator is not in the pass-through mode, we test with the predicate. If this turns out to be true, that indicates the item has to be dropped and a fresh item has to be requested as replenishment. Otherwise the operator enters its pass-through mode and the current and subsequent items will be emitted directly without invoking the predicate again. Implementing an "until" variant, where the predicate returning true still drops the current item, is left to the reader as an exercise.


Conclusion

The post demonstrated a way to implement the take, takeWhile, takeUntil, skip and skipWhile operators. The logic behind them doesn't require a fully fledged queue-drain approach and for being synchronous in-flow operators, don't really have to bother with concurrency and request management (except the two take alternatives which do have to manage requests).

The fact that the onXXX methods of a Flow.Subscriber are not allowed to be invoked concurrently greatly helps reducing the complexity and the need for being defensive all the time.

In the next post, we'll see how a multicasting Processor can be implemented and how a fan-out-fan-in operator can be implemented with its help.

2017. szeptember 27., szerda

Java 9 Flow API: arbitration and concatenation

Introduction


A very common task is to combine multiple sources, or more generally, start consuming a source once the previous source has terminated. The naive approach would be to simply call otherSource.subscribe(nextSubscriber) from onError or onComplete. Unfortunately, this doesn't work for two reasons: 1) it may end up with deep stacks due to a "tail" subscription from onError/onComplete and 2) we should request the remaining, unfulfilled amount from the new source that hasn't be provided by the previous source to not overflow the downstream.

The first issue can be solved by applying a heavyweight observeOn in general and implementing a basic trampolining loop only for certain concrete cases such as flow concatenation to be described in this post.

The second issue requires a more involved source: not only do we have to switch between Flow.Subscriptions from different sources, we have to make sure concurrent request() invocations are not lost and are routed to the proper Flow.Subscription along with any concurrent cancel() calls. Perhaps the difficulty is lessened by the fact that switching sources happens on a terminal event boundary only, thus we don't have to worry about the old source calling onNext while the logic switches to the new source and complicating the accounting of requested/emitted item counts. Enter SubscriptionArbiter.


Subscription arbitration


We have to deal with 4 types of potentially concurrent signals when arbitrating Flow.Subscriptions:


  1. A request(long) call from downstream that has to be routed to the current Flow.Subscription
  2. A cancel() call from downstream that has to be routed to the current Flow.Subscription and cancel any future Flow.Subscription.
  3. A setSubscription(Flow.Subscription) that is called by the current Flow.Subscriber after subscribing to any Flow.Publisher which is not guaranteed to happen on the same thread subscribe() is called (i.e., as with the standard SubmissionPublisher or our range() operator).
  4. A setProduced(long n) that is called when the previous source terminates and we want to make sure the new source will be requested the right amount; i.e., we'll have to deduce this amount from the current requested amount so setSubscription will issue the request for the remainder to the new Flow.Subscription.


Let's start with the skeleton of the SubscriptionArbiter class providing these methods:


public class SubscriptionArbiter implements Flow.Subscription {

    Flow.Subscription current;
    static final VarHandle CURRENT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "current", Flow.Subscription.class);

    Flow.Subscription next;
    static final VarHandle NEXT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "next", Flow.Subscription.class);

    long requested;

    long downstreamRequested;
    static final VarHandle DOWNSTREAM_REQUESTED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "downstreamRequested", Flow.Subscription.class);
    
    long produced;
    static final VarHandle PRODUCED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "produced", Flow.Subscription.class);

    long wip;
    static final VarHandle WIP =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "wip", int.class);

    @Override
    public final void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    public final boolean isCancelled() {
        // TODO implement
        return false;
    }

    public final void setSubscription(Flow.Subscription s) {
        // TODO implement
    }

    public final void setProduced(long n) {
        // TODO implement
    }

    final void arbiterDrain() {
        // TODO implement
    }
}

We intend the class to be extended to save on allocation and object headers, however, some methods should not be overridden by any subclass as it would likely break the internal logic. The only relatively safe overridable method is cancel(): the subclass will likely have its own resources that have to be released upon a cancel() call from the downstream which will receive an instance of this class via onSubscribe. The meaning of each field is as follows:


  • current holds the current Flow.Subscription. Its companion CURRENT VarHandle is there to support cancellation.
  • next temporarily holds the next Flow.Subscription to replace current instance. Direct replacement can't work due to the required request accounting.
  • requested holds the current outstanding request count. It doesn't have any VarHandle because it will be only accessed from within a drain-loop.
  • downstreamRequested accumulates the downstream's requests in case there the drain loop is executing.
  • produced holds the number of items produced by the previous source which has to be deduced from requested before switching to the next source happens. It is accompanied by a VarHandle to ensure proper visibility of its value from within the drain loop.
  • wip is our standard work-in-progress counter to support the queue-drain like lock-free serialization we use almost everywhere now.

The first method we implement is request() that will be called by the downstream from an arbitrary thread at any time:


    @Override
    public final void request(long n) {
        for (;;) {
            long r = (long)DOWNSTREAM_REQUESTED.getAcquire(this);
            long u = r + n;
            if (u < 0L) {
                u = Long.MAX_VALUE;
            }
            if (DOWNSTREAM_REQUESTED.compareAndSet(this, r, u)) {
                arbiterDrain();
                break;
            }
        }
    }


We perform the usual atomic addition capped at Long.MAX_VALUE and call arbiterDrain().

    @Override
    public void cancel() {
        Flow.Subscription s = (Flow.Subscription)CURRENT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
        s = (Flow.Subscription)NEXT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
    }

    public final boolean isCancelled() {
        return CURRENT.getAcquire(this) == this;
    }


We atomically swap in both the current and the next Flow.Subscription instances with the cancelled indicator of this. To support some eagerness in cancellation, the isCancelled can be called by the subclass (i.e., concat an array of Flow.Publishers) to quit its trampolined looping.

Next, we "queue up" the next Flow.Subscription:


    public final void setSubscription(Flow.Subscription subscription) {
        if (NEXT.compareAndSet(this, null, subscription)) {
            arbiterDrain();
        } else {
            subscription.cancel();
        }
    }


We expect there will be only one thread calling setSubscription and that call happens before the termination of the associated source, thus a simple CAS from null to subscription should be enough. In this scenario, a failed CAS can only mean the arbiter was cancelled in the meantime and we cancel the subscription accordingly. We'll still have to relay the unfulfilled request amount to this new subscription which will be done in arbiterDrain().

The setProducer will have to "queue up" the fulfilled amount in a similar fashion:


    public final void setProduced(long n) {
        PRODUCED.setRelease(this, n);
        arbiterDrain();
    }


As with the setSubscription, we expect this to happen once per a terminated source before the subscription to the next source happens, thus there is no real need to atomically accumulate the item count.

Finally, let's see the heavy lifting in arbiterDrain() itself now:


    final void arbiterDrain() {
        if ((int)WIP.getAndAdd(this, 1) != 0) {
            return;
        }
        
        Flow.Subscription requestFrom = null;
        long requestAmount = 0L;

        for (;;) {

            // TODO implement

            if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
                break;
            }
        }

        if (requestFrom != null && requestFrom != this
                && requestAmount != 0L) {
            requestFrom.request(requestAmount);
        }
    }


The arbiterDrain(), whose name was chosen to avoid clashing with a subclass' drain() method if any, method starts out as most typical trampolined drain loop did: the atomic increment to wip from 0 to 1 enters the loop and the decrement to zero leaves the loop.

One oddity may come from the requestFrom and requestAmount local variables. Unlike a traditional stable-prefetch queue-drain, requesting from within the loop can bring back the reentrancy issue, the tail-subscription problem and may prevent other actions from happening with the arbiter until the request() call returns. Therefore, once the loop decided what the current target Flow.Subscription is, we'll issue a request to it outside the loop. It is possible by the time the drain method reaches the last if statement that the current requestFrom is outdated or the arbiter was cancelled. This is not a problem because request() and cancel() in general are expected to race and an outdated Flow.Subscription means it has already terminated and a request() call is a no-op to it.

The last part inside the loop has to "dequeue" the deferred changes and apply them to the state of the arbiter:


    for (;;) {

        // (1) ----------------------------------------------
        Flow.Subscription currentSub = (Flow.Subscription)CURRENT.getAcquire(this);
        if (currentSub != this) {

            // (2) ------------------------------------------
            long req = requested;

            long downstreamReq = (long)DOWNSTREAM_REQUESTED.getAndSet(this, 0L);

            long prod = (long)PRODUCED.getAndSet(this, 0L);

            Flow.Subscription nextSub = (Flow.Subscription)NEXT.getAcquire(this, null);
            if (nexSub != null && nextSub != this) {
                NEXT.compareAndSet(this, nextSub, null);
            }

            // (3) ------------------------------------------
            if (downstreamReq != 0L) {
                req += downstreamReq;
                if (req < 0L) {
                    req = Long.MAX_VALUE;
                }
            }

            // (4) ------------------------------------------
            if (prod != 0L && req != Long.MAX_VALUE) {
                req -= prod;
            }

            requested = req;

            // (5) ------------------------------------------
            if (nextSub != null && nextSub != this) {
                requestFrom = nextSub;
                requestAmount = req;
                CURRENT.compareAndSet(currentSub, nextSub);
            } else {
                // (6) --------------------------------------
                requestFrom = currentSub;
                requestAmount += downstreamReq;
                if (requestAmount < 0L) {
                    requestAmount = Long.MAX_VALUE;
                }
            }
        }

        if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
            break;
        }
    }



  1. First we check if the current instance holds the cancelled indicator (this). If so, we don't have to execute any of the logic as the arbiter has been cancelled by the downstream.
  2. We read out the current and queued state: the current outstanding requested amount, the request amount from the downstream if any, the produced item count by the previous source and the potential next Flow.Subscription instance. While it is safe to atomically swap in 0 for both the downstreamRequested and produced values, swapping in null unconditionally may overwrite the cancelled indicator and the setSubscription won't cancel its argument.
  3. If there was an asynchronous request() call, we add the downstreamReq amount to the current requested amount, capped at Long.MAX_VALUE (unbounded indicator).
  4. If there was a non-zero produced amount and the requested amount isn't Long.MAX_VALUE, we subtract the two. The new requested amount is then saved.
  5. If there was a new Flow.Subscription set via setSubscription, we indicate where to request from outside the loop and we indicate the whole current requested amount (now including any async downstream request and upstream produced count) should be used. The CAS will make sure the next Flow.Subscription only becomes the current one if there was no cancellation in the meantime.
  6. Otherwise, we target the current Flow.Subscription, add up the downstream's extra requests capped at Long.MAX_VALUE. The reason for this is that the downstream may issue multiple requests (r1, r2) in a quick succession which makes the logic to loop back again, now having r1 + r2 items outstanding from the downstream's perspective.

Now that the infrastructure is ready, let's implement a couple of operators.


Concatenating an array of Flow.Publishers


Perhaps the simplest operator we could write on top of the SubscriptionArbiter is the concat() operator. It consumes one Flow.Publisher after another in a non-overlapping fashion until all of them have completed.


@SafeVarargs
public static <T> Flow.Publisher<T> concat(Flow.Publisher<? extends T>... sources) {
    return new ConcatPublisher<>(sources);
}


The ConcatPublisher itself is straightforward: create a coordinator, send it to the downstream and trigger the consumption of the first source:


     @Override
     public void subscribe(Flow.Subscriber<? super T> s) {
         ConcatCoordinator<T> parent = new ConcatCoordinator<>(s, sources);
         s.onSubscribe(parent);
         parent.drain();
     }

The ConcatCoordinator can be implemented as follows:


static final class ConcatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
    
    final Flow.Subscription<? super T> downstream;

    final Flow.Publisher<? extends T>[] sources;

    int index;

    int trampoline;
    static final VarHandle TRAMPOLINE =
        VH.find(MethodHandles.lookup(), ConcatCoordinator.class,
            "trampoline", int.class);

    long consumed;
    
    ConcatCoordinator(
           Flow.Subscription<? super T> downstream,
           Flow.Publisher<? extends T>[] sources
    ) {
        this.downstream = downstream;
        this.sources = sources;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }
}


The ConcatCoordinator extends the SubscriptionArbiter, thus it is a Flow.Subscription as well and as such will be used as the connection object towards the downstream. It also implements Flow.Subscription because we'll use the same instance to subscribe to all of the Flow.Publishers one after the other.

One may come up with the objection that reusing the same Flow.Subscriber instance is not allowed by the Reactive-Streams specification the Flow API inherited. However, the specification actually just discourages the reuse and otherwise mandates external synchronization so that the onXXX methods are invoked in a serialized manner. We'll see that the trampolining in the operator will just ensure that property along with the arbiter itself. Of course, we could just new up a Flow.Subscriber for the next source but that Flow.Subscriber would be itself nothing more than a delegator for the coordinator instance (no need for a per-source state in it); combining the two just saves on allocation and indirection.

The fields are interpreted as follows:


  • downstream is the Flow.Subscriber that receives the signals.
  • sources is the array of Flow.Publishers that will be consumed one after the other
  • index points at the current Flow.Publisher and gets incremented once one completes.
  • trampoline is the work-in-progress indicator for the drain loop; chosen to avoid clashing with the arbiter's own wip field in this blog for better readability. In practice, since they are in different classes, one can name them both wip.
  • consumed tracks how many items the current source has produced (and has the coordinator consumed). We'll use this to update the arbiter at the completion of the current source instead of doing it for each item received because that saves a lot of overhead and we don't really care about each individual item's reception.


The coordinator's onXXX methods are relatively trivial at this point:


   @Override
    public void onSubscribe(Flow.Subscription s) {
        setSubscription(s);
    }

    @Override
    public void onNext(T item) {
        consumed++;
        downstream.onNext(item);
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        drain();
    }
   


We save the Flow.Subscription into the arbiter, write through the item or throwable and call the drain method upon normal completion.

What's left is the drain() method itself:


    void drain() {
        // (1) -------------------------------------------------------
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                // (2) -----------------------------------------------
                if (isCancelled()) {
                    return;
                }

                // (3) -----------------------------------------------
                if (index == sources.length) {
                    downstream.onComplete();
                    return;
                }

                // (4) -----------------------------------------------
                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                // (5) -----------------------------------------------
                sources[index++].subscribe(this);

            // (6) ---------------------------------------------------
            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }

Again, not really a complicated method, but as usual, the difficulty may come from understanding why such short code is actually providing the required behavior and safeguards:


  1. We know that drain() is only invoked from the subscribe() or onComplete() methods. This standard lock-free trampolining check ensures only one thread is busy setting up the consumption of the next (or the first) source. In addition, since only a guaranteed one-time per source onComplete() can trigger the consumption of the next, we don't have to worry about racing with onNext in this operator. (However, an in-flow concatMap is a different scenario.) This setup also defends against increasing the stack depth due to tail-subscription: a trampoline > 1 indicates we can immediately subscribe to the next source.
  2. In case the downstream cancelled the operator, we simply quit the loop.
  3. In case the index is equal to the number of sources, it means we reached the end of the concatenation and can complete the downstream via onComplete().
  4. Otherwise, we indicate to the arbiter the number of items consumed from the previous source so it can update its outstanding (current) request amount. Note that consumed is not concurrently updated because onNext and onComplete (and thus drain) on the same source can't be executed concurrently.
  5. We then subscribe to the next source, move the index forward by one to point to the next-next source and subscribe with this.
  6. Finally if there was no synchronous or racy onComplete, we quit the loop, otherwise we resume with the subsequent sources.


One can add a few features and safeguards to this coordinator, such as delaying errors till the very end and ensuring the indexth sources entry is not null. These are left as exercise to the reader.


Repeat


How can we turn this into a repeat operator where the source is resubscribed on successful completion? Easy: drop the index and have only a single source Flow.Publisher to be worked on!


public static <T> Flow.Publisher<T> repeat(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RepeatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onXXX methods are the same

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                if (--max < 0L) {
                    downstream.onComplete();
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


Given that repeating indefinitely is usually not desired, we limit the resubscriptions to a number of times specified by the user. Since there is only one source Flow.Publisher, no indexing into an array is needed and we only have to decrement the counter to detect the condition for completing the downstream.


Retry


How about retrying a Flow.Publisher in case it failed with an onError? Easy: have onError call drain() and onComplete call downstream.onComplete() straight:


public static <T> Flow.Publisher<T> retry(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (--max < 0L) {
            downstream.onError(throwable);
        } else {
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


There are two slight changes in retry(). First, in case we run out of the retry count, the latest Flow.Publisher's error is delivered to the downstream from within onError and no further retry can happen. Consequently, the drain logic no longer should call onComplete when the number of allowed retries have been used up.


On error, resume with another Flow.Publisher


Now that we've seen multi-source switchover and single-source continuation, switching to an alternative or "fallback" Flow.Publisher should be straightforward to set up: have a 2 element array with the main and fallback Flow.Publishers, then make sure onError triggers the switch.


public static <T> Flow.Publisher<T> onErrorResumeNext(
        Flow.Publisher<T> source, Flow.Publisher<T> fallback) {
    return new RepeatPublisher<>(source, fallback);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    final Flow.Publisher<T> fallback;

    boolean switched;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (switched) {
            downstream.onError(throwable);
        } else {
            switched = true;
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                if (switched) {
                    fallback.subscribe(this);
                } else {
                    source.subscribe(this);
                }

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}

Here, we have two states, switched == false indicates we are consuming the main source. If that fails, we set switched = true and the drain loop will subscribe to the fallback Flow.Publisher. However, if the fallback fails, the onError also checks for switched == true and instead of draining (and thus retrying) the fallback Flow.Publisher again, it just terminates with the Throwable the fallback emitted.


Conclusion


In this post, the subscription arbitration concept was presented which allows us to switch between non-overlapping Flow.Publisher sources when one terminates (completes or fails with an error) while maintaining the link of cancellation between the individual Flow.Subscriptions as well as making sure backpressure is properly transmitted and preserved between them.

When combining with a trampolining logic, such arbitration allowed us to implement a couple of standard ReactiveX operators such as concat, repeat, retry and onErrorResumeNext while only applying small changes to the methods and algorithms in them.

Note however, that even if the arbiter can be reused for in-flow operators such as concatMap (concatenate Flow.Publishers generated from upstream values), repeatWhen (repeat if a companion Flow.Publisher signals an item) and retryWhen, one can no longer use a single Flow.Subscriber to subscribe to both the main flow and the inner/companion flows at the same time. We will explore these types of operators in a later post.

The arbitration has its own limit: it can't support live switching between sources, i.e., when onNext may be in progress while the switch has to happen. If you are familiar with the switchMap operator, this is what can happen during its execution. We'll look into this type of operator in a subsequent post.

But for now, we'll investigate a much lighter set of operators in the next post: limiting the number of items the downstream can receive and skipping certain number of items from the upstream; both based on counting items and based on a per-item predicate checks, i.e., the take() and skip() operators.

2017. szeptember 21., csütörtök

Java 9 Flow API: switching threads

Introduction


Ensuring certain computations happen on the right thread, usually off the main thread, is a very common development task when dealing with reactive flows. When building up tools for Java 9's Flow API, one can decide to add this thread-switching support to each operator directly - see the range() operator from the start of the series -, or have a standalone stage for this purpose.

This is a tradeoff. Inlining thread switching avoids bogging down the source thread like the thread-stealing behavior of most of the queue-drain approach presented so far. A separate operator allows better composition and may even allow working with exotic asynchrony-providing components.


The observeOn operator


In Java, threading support is provided via the Executor, ExecutorService and ScheduledExecutorService-based API. Executor is is the most basic one of them which only provides a single execute(Runnable) method. This allows creating an Executor from a lambda:


Executor trampoline = Runnable::run;

Executor swing = SwingUtilities::invokeLater;

Executor pool = ForkJoinPool.commonPool();


As the least common denominator, we'll use Executor in defining our observeOn operator:


public static <T> Flow.Publisher<T> observeOn(
        Flow.Publisher<T> source, 
        Executor exec,
        int prefetch) {
    return new ObserveOnPublisher<>(source, exec, prefetch);
}


Crossing an asynchronous boundary requires the temporary storage of an event until the other side can pick it up. The queue-drain approach can provide a nice bounded queue we can size with prefetch. In addition, the so-called stable-prefetch request management (shown in the mapFilter operator before) allows minimizing the overhead of requesting more items.

First, let's see the skeleton of the operator's main Flow.Subscriber implementation:

static final class ObserveOnSubscriber<T> implements
Flow.Subscriber<T>, Flow.Subscription, Runnable {

    final Flow.Subscriber<? super T> downstream;

    final Executor exec;

    final int prefetch;

    final Queue<T> queue;

    Flow.Subscription upstream;

    int wip;
    static final VarHandle WIP = 
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "wip", int.class);

    long requested;
    static final VarHandle REQUESTED =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "requested", long.class);

    boolean done;
    static final VarHandle DONE =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "done", boolean.class);

    boolean cancelled;
    static final VarHandle CANCELLED =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "cancelled", boolean.class);
    
    Throwable error;

    long emitted;

    int consumed;

    ObserveOnSubscriber(
            Flow.Subscriber<? super T> downstream,
            Executor exec,
            int prefetch
    ) {
        this.downstream = downstream;
        this.exec = exec;
        this.prefetch = prefetch;
        this.queue = new SpscArrayQueue<>(prefetch);
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    void schedule() {
        // TODO implement
    }

    @Override
    public void run() {
        // TODO implement
    }
}


The SpscArrayQueue is available from the JCTools library. VH is a shortand utility class for getting a VarHandle for a particular field:

public final class VH {
    public static VarHandle find(
            MethodHandles.Lookup lookup, 
            Class clazz,
            String field,
            Class type) {
        try {
            return lookup.findVarHandle(clazz, field, type);
        } catch (Throwable ex) {
            throw new InternalError(ex);
        }
    }
}


If you are using IntelliJ IDEA, the latest version has nice parameter matching support for findVarHandle and highlights it if the field or type doesn't match the target class' member definition. Unfortunately, the method throws a checked exception which requires wrapping it in try-catch unavailable at field initialization time. By factoring the logic out, we lose that support but gain some other convenience. Needless to say, if the field names or types are off, we'll get a nice InternalError during unit tests anyway. Note that the MethodHandles.Lookup instance has to be provided because otherwise the find method could not access the fields of the target class due to visibility restrictions enforced by the JVM otherwise.

Now let's describe the fields quickly:


  • downstream comes from the parent Flow.Publisher's subscribe method as usual.
  • exec is the Executor the operator will use
  • prefetch defines the number of items to request from the upstream when the connection is established. A specific proportion (75%) will be used for replenishing items.
  • queue will hold up to the given prefetch amount of items. Since there is only one thread calling onNext and one thread draining the queue, we'll use a bounded single-producer single-consumer queue.
  • upstream is received through onSubscribe from the upstream source that allows requesting and cancelling the flow.
  • wip will ensure there is only one drain running at a time, executing on the given Executor. This is the same trampolining logic as with other concurrent operators: a transition from 0 to 1 will start the drain process and any further increments to wip will indicate additional work has to be done by the drain process.
  • requested will track the items requested by the downstream as the requesting is decoupled by the operator. The reason for this is that the operator uses a bounded buffer and there is no way to expect the downstream to request only as much as the buffer can hold at a given moment.
  • done indicates the upstream has finished emitting items. Calling onComplete on the downstream immediately is not an option as there could be items still queued up.
  • cancelled indicates the downstream issued a cancel() call and both the drain loop and the upstream has to stop producing events.
  • error holds the Throwable from the upstream. We'll only read it if done == true ensuring proper cross-thread visibility.
  • emitted counts how many items have been emitted towards the downstream. Decrementing the requested field is possible but generally expensive due to unavoidable atomics (and may not auto-batch decrements at all, thus each item incurs the cost).
  • consumed counts how many items have been taken from the queue. It can be reset to zero when a certain limit is reached (75% of prefetch for example) and issue a request() towards the upstream, asking for a fixed number of items periodically.
Implementing the Flow.Subscriber methods is straightforward as they have little to do; store values and try starting the drain process:


    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(this);
        s.request(prefetch);
    }

    @Override
    public void onNext(T item) {
        queue.offer(item);
        schedule();
    }

    @Override
    public void onError(Throwable throwable) {
        error = throwable;
        DONE.setRelease(this, true);
        schedule();
    }

    @Override
    public void onComplete() {
        DONE.setRelease(this, true);
        schedule();
    }


The implementation of Flow.Subscription part is similarly not that complicated. The handling of non-positive request amount is left out for brevity. We have to trigger the draining logic when the downstream requests in case the upstream has items queued up already due to possible speed difference.

    @Override
    public void request(long n) {
        for (;;) {
           long a = (long)REQUESTED.getAcquire(this);
           long b = a + b;
           if (b < 0L) {
               b = Long.MAX_VALUE;
           }
           if (REQUESTED.compareAndSet(this, a, b)) {
               schedule();
               break;
           }
    }

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            upstream.cancel();
            if ((int)WIP.getAndAdd(this, 1) == 0) {
                queue.clear();
            }
        }
    }


The request() uses the typical, capped atomic addition of the requested amount. The cancel() method sets the cancelled flag once, cancels the upstream and if there is no draining happening at the moment, clears the queue to help the GC.

The schedule() method conceptually works like any drain() method we've implemented so far. The difference is that the drain loop inside it has to be run by the Executor (potentially) on another thread:


    void schedule() {
        if ((int)WIP.getAndAdd(this, 1) == 0) {
            exec.execute(this);
        }
    }



By implementing Runnable directly, it saves us creating the drain task all the time; here, all the state that need to be communicated between the caller of schedule() and the run() method is done through fields. Since the transition from 0 to 1 and later on, N back to 0 happens on a single thread, the underlying Executor can be a pool of any threads; this type of trampolining will make the Executor to pick one thread from this pool and while the drain lasts, prevent any other thread from the same pool to start another drain run (which would violate the Flow protocol and/or reorder events).

Finally, let's see the implementation of run() that holds the rest of a typical drain-loop:

    @Override
    public void run() {
        int missed = 1;
        
        Flow.Subscriber<? super T> a = downstream;
        Queue<T> q = queue;
        long e = emitted;
        int c = consumed;
        int limit = prefetch - (prefetch >> 2);

        for (;;) {

             long r = (long)REQUESTED.getAcquire(this);

             while (e != r) {
                 // TODO implement
             }

             if (e == r) {
                 // TODO implement
             }

             emitted = e;
             consumed = c;
             missed = (int)WIP.getAndAdd(this, -missed) - missed;
             if (missed == 0) {
                 break;
             }
        }
    }


The pattern for the drain loop is pretty standard: the missing counter will detect if there is more work to be done, we load fields into local variables to avoid fetching them from last-level-cache due to all the atomics around. Lastly, we have the usual while loop that keeps running until we run out of requests or upstream events, then checking if by reaching a terminal state in the operator, the terminal events can be emitted without actual requests or not.

There is nothing unusual at this point in the inner while loop and the if statements:

    while (e != r) {
        if ((boolean)CANCELLED.getAcquire(this)) {
            q.clear();
            return;
        }
       
        boolean d = (boolean)DONE.getAcquire(this);
        T v = q.poll();
        boolean empty = v == null;

        if (d && empty) {
            Throwable ex = error;
            if (ex == null) {
                a.onComplete();
            } else {
                a.onError(ex);
            }
            return;
        }    

        if (empty) {
            break;
        }

        a.onNext(v);

        e++;
        if (++c == limit) {
            c = 0;
            upstream.request(limit);
        }
    }

    if (e == r) {
        if ((boolean)CANCELLED.getAcquire(this)) {
            q.clear();
            return;
        }
       
        boolean d = (boolean)DONE.getAcquire(this);
        boolean empty = q.isEmpty();

        if (d && empty) {
            Throwable ex = error;
            if (ex == null) {
                a.onComplete();
            } else {
                a.onError(ex);
            }
            return;
        } 
    }


Refitting operators with async drain


As hinted during the conclusion of the orderedMerge() operator, sometimes we'd want to drain the events of a multi-source operator on another thread. If one thinks about the structure shown in the previous section, the solution to the problem should be clear: propagate an Executor instance into the operator, implement Runnable, keep the WIP increment in drain() and move the rest of the method into the run() method:


    final Executor exec;

    // ...

    void drain() {
        if (getAndIncrement() != 0) {
            return;
        }

        exec.execute(this);
    }

    @Override
    public void run() {
 
        int missed = 1;
        Flow.Subscriber downstream = this.downstream;
 
        Comparator comparator = comparator;
 
        OrderedMergeInnerSubscriber[] subscribers = this.subscribers;
        int n = subscribers.length;
 
        Object[] values = this.values;
 
        long e = emitted;
 
        for (;;) {
             long r = (long)REQUESTED.getAcquire(this);
 
             for (;;) {
                 /* unchanged, left out for brevity */
             }
 
             emitted = e;
             missed = addAndGet(-missed);
             if (missed == 0) {
                 break;
             }
        }
    }


Cancelling the drain task


So far, we used the void execute(Runnable) method and relied upon cancelled to stop the drain process itself. However, there are a couple of problems:


  • What if the onNext call on downstream blocks or takes a long time?
  • What if the Executor is so busy the drain doesn't execute within a timeout?

One way of dealing with this situation is to use the more advanced ExecutorService API and work with instances provided by the java.util.concurrent.Executors utility class (or the ForkJoinPool.commonPool()). Their submit() method returns a Future instance we can call cancel() on. The benefit is that all the Thread interruption infrastructure is provided and we don't have to write that ourselves:


    final ExecutorService execService;

    // ...

    void schedule() {
        if ((int)WIP.getAndAdd(this, 1) == 0) {
            Future<?> future = exec.submit(this);
            // do something with "future"
        }
    }


At this point we have to make future available to the cancellation routine in some thread-safe fashion because cancel() can be called at any time from any thread, even while the schedule() method is executing. The solution is to use the deferred cancellation pattern. First we have to define a field to store the current Future of the running drain task and an associated VarHandle to perform atomic operations with the reference:

    Future<?> future;
    static final VarHandle FUTURE =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class,
            "future", Future.class);
    static final Future<?> CANCELLED_TASK = new FutureTask<>(() -> { }, null);


Next, we have to update the schedule() method to store the Future obtained from the ExecutorService.submit call not just atomically, but also avoiding a peculiar race.

    void schedule() {
        if ((int)WIP.getAndAdd(this, 1) == 0) {

            Future<?> old = (Future<?>)FUTURE.getAcquire(this);
            if (old != CANCELLED_TASK) {
                Future<?> future = exec.submit(this);
                if (!FUTURE.compareAndSet(this, old, future)) {
                    old = (Future<?>)FUTURE.getAcquire(this);
                    if (old == CANCELLED_TASK) {
                        future.cancel(true);
                    }
                }
            }
        }
    }



Once the submit() returns, we might no longer be under the protection of the wip != 0 and thus a concurrent call to schedule() (via request() for example) may have triggered another drain run with another Future instance. If we'd update the future reference unconditionally, that newer Future instance would be knocked out and the task-cancellation would no longer work properly.

In addition, a concurrent cancellation may have swapped in the CANCELLED_TASK indicator in which case we should cancel the Future right away. There is no need to loop this construct because if the CAS fails, it is either due to a cancellation or a newer Future task. The former requires cancelling the returned Future and the latter can be ignored because the returned Future is practically finished anyway.

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            upstream.cancel();

            Future<?> future = (Future<?>)FUTURE.getAndSet(this, CANCELLED_TASK);
            if (future != null && future != CANCELLED_TASK) {
                future.cancel(true);
            }

            if ((int)WIP.getAndAdd(this, 1) == 0) {
                queue.clear();
            }
        }
    }

The cancel() method requires less changes: we atomically replace the current future with the CANCELLED_TASK indicator and if otherwise the operator was not cancelled already, we cancel the non-null Future instance and carry on.

One can, of course, go extra lengths and make sure the future field is nulled out in between drain runs to avoid leaking references with certain executor service-alikes.


Conclusion


In this post I've shown how one can implement a thread-switching operator, observeOn in RxJava terms, with the Java 9 Flow API and the standard Java async-providing in options such as Executors. The presented algorithms which automatically trampoline the draining of the queued events allows using all sorts of Executors, including a completely synchronous Runnable::run Executor that executes the task on the caller thread. Since the operator's logic is largely independent of how the asynchrony is established, it can be tested quite easily in a synchronous unit test by itself or as part of a complicated chain of potentially asynchronous flows.

In practice, one may have consider the handling of a RejectedExecutionException thrown by the execute/submit methods on certain ExecutorServices. The trouble is that even though downstream.onError can be called - as no drain() is/can run at that time -, the downstream may really expect errors also delivered on the same background thread as the onNext items. In addition, one has to suppress other onXXX signals from the upstream similar to how a failed user-provided function in an onNext has to suppress any subsequent onError or onComplete call by the upstream because these terminal events are not required to stop emitting events. This part is left to the library developer to decide on his/her own.

In the next post, we'll see how one can stop a flow if items didn't arrive in a timely manner.