2017. szeptember 30., szombat

Java 9 Flow API: taking and skipping

Introduction


Limiting or skipping over parts of a flow is a very common task: either we are only interested in the first N items or we don't care about the first N items. Sometimes, N is unknown but we can decide, based on the current item, when to stop relaying items or, in contrast, when to start relaying items.


Take(N)


In concept, limiting a flow to a certain size should be straightforward: count the number of items received via onNext and when the limit is reached, issue a cancel() towards the upstream and onComplete() towards the downstream.


public static <T> Flow.Publisher<T> take(Flow.Publisher<T> source, long n) {
    return new TakePublisher<>(source, n);
}


The operator's implementation requires little state:

static final class TakeSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
}


In its simplest form, there is no need for intercepting the request() and cancel() calls from the downstream: these can be passthrought, however, since the operator has to stop the sequence upon reaching the limit (remaining == 0), the upstream's Flow.Subscriber has to be stored.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        this.upstream = s;
        downstream.onSubscribe(s);
    }


In onSubscribe, we only have to store the Flow.Subscription and forward it to the downstream.

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r > 0L) {
            remaining = --r;
            downstream.onNext(item);
 
            if (r == 0) {
                upstream.cancel();
                downstream.onComplete();
            }
        }
    }


While remaining is positive, we decrement it and save it into its field followed by an emission of the current item. If the remaining count reached zero, the upstream is cancelled and the downstream is completed. Any further items will be ignored (in case cancel() doesn't immediately stop the upstream).

    @Override
    public void onError(Throwable throwable) {
        if (remaining != 0L) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (remaining != 0L) {
            downstream.onComplete();
        }
    }


The onError and onComplete check the remaining count and if it's positive, the respective terminal event is reached. The reason for this is that if the operator runs out of items before its limit, the terminal event is relayed as well. However, if the operator happens to run with a limit equal to the actual length of the flow, the last item in onNext will trigger an onComplete which could be followed by a terminal event from the upstream that has to be suppressed. This behavior is allowed by the Reactive-Streams specification (i.e., cancel() may not have immediate effect) and we have dealt with it via a done field in other operators. Here, the fact remaining == 0 is the done indicator.

Being a pass-through for backpressure (the downstream calls request() directly on the upstream's Flow.Subscription), the upstream is not limited by this operator and may attempt to produce more items than the limit. In other times, knowing the downstream requested more than the limit, the upstream can actually be consumed in an unbounded fashion, utilizing fast-paths may result in improved performance.

Deciding which of the three behaviors should be default is up to the operator writer, but it is interesting to look at the remaining two modes: limiting the request amount and unbounding it.

Limiting the request amount


In this mode, if the downstream requests more or equal to the limit, we can issue a single request with the limit amount. However, if the downstream requests less, we have to perform some intricate request accounting.

We'll need a new field, requestRemaining with a VarHandle REQUEST_REMAINING companion. We also have to intercept request() from downstream and figure out atomically what number to request from the upstream so the total requested amount doesn't go above the limit.


static final class TakeSubscriber<T> implements 
Flow.Subscriber<T>, Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    long requestRemaining;
    static final VarHandle REQUEST_REMAINING =
        VH.find(MethodHandles.lookup(), TakeSubscriber.class,
            "requestRemaining", long.class);

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
        this.requestRemaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(this);
    }

    // onNext, onError and onComplete are the same

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        upstream.cancel();
    }
}


The onSubscribe now forwards this as the Flow.Subscription and cancel() just delegates to the upstream.cancel().

    @Override
    public void request(long n) {
        for (;;) {
            long r = (long)REQUEST_REMAINING.getAquire(this);
            long newRequest;
            if (r <= n) {
                newRequest = r;
            } else {
                newRequest = n;
            }
            long u = r - newRequest;
            if (REQUEST_REMAINING.compareAndSet(this, r, u)) {
                upstream.request(newRequest);
                break;
            }
        }
    }


First we read the remaining request amount. If it is smaller or equal to the downstream's request, we'll request the remaining amount from the upstream. If the downstream requested less than the remaining amount, we'll request that amount instead. The new remaining amount is then the current minus the new request amount decided. After the successful CAS, we request the new amount from the upstream and quit the loop.


Unbounding the request amount


The other direction, namely, unbounding the upstream if the downstream requested at least the limit of the operator, can be achieved through the same requestRemaining field but a different request() logic:


    @Override
    public void request(long n) {
        long r = (long)REQUEST_REMAINING.getAcquire(this);
        if (r != 0L) {
             r = (long)REQUEST_REMAINING.getAndSet(this, 0L);
             if (r != 0L && r <= n) {
                 upstream.request(Long.MAX_VALUE);
                 return;
             }
        }
        upstream.request(n);
    }


If the remaining request amount is non-zero, we atomically replace it with zero. This happens exactly once, for the first time request() is invoked by the downstream, and we'll check if the remaining amount (same as the limit) is less or requal to the downstream's request amount. If so, we request from the upstream an unbounded amount (Long.MAX_VALUE). If it was not the first request or the amount was less than the limit, we revert back to the pass-through behavior.


Take with predicate


Sometimes, we'd like to stop the flow when an item matches a certain condition. This can happen before the item is emitted (i.e., takeWhile) or after (i.e., takeUntil). When working with a predicate, we no longer know how many items we'll let pass thus manipulating the request amount is not really an option here.


public static <T> Flow.Publisher<T> takeWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new TakeWhilePublisher<>(source, predicate);
}

// Flow.Publisher boilerplate omitted

static final class TakeWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean done;

    TakeWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe simply forwards the Flow.Subscription and the terminal onError/onComplete methods forward their respective event if the done flag is false. The flag will be set in onNext if the flow has been stopped due to the condition and is there to prevent the emission of multiple terminal events in case the flow would have ended anyway.

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        boolean b;

        try {
            b = predicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            downstream.onNext(item);
        } else {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


The first step is to protect against sources that would produce a few items after a cancel() call. Next, we don't trust the user-provided Predicate, which when crashes, will have to cancel the upstream, lock out further events and signal the Throwable to the downstream. A true result from the predicate will allow us to emit the item to the downstream. A false will stop the source, lock out further upstream events and complete the downstream.

If we'd still want to receive the item before the predicate indicates the flow should stop, aka the takeUntil operator, only the onNext logic should be changed a bit:

public static <T> Flow.Publisher<T> takeUntil(
        Flow.Publisher<T> source, Predicate<? super T> stopPredicate) {

    return new TakeUntilPublisher<>(source, stopPredicate);
}

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        downstream.onNext(item);

        boolean b;

        try {
            b = stopPredicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


Here, the stopPredicate of the API entry point should indicate when to stop by returning true, whereas the previous takeWhile operator indicated a stop via false. It is a matter of taste I guess. The RxJava convetion is that takeWhile(item -> item < 5) will take items while each of them is less than five, never emitting five itself whereas takeUntil(item -> item == 5) will stop after emitting five.


Skip(N)


The dual of take(N), in some sense, is the operator which skips the first N items then lets the rest through. Again, counting the items is a crucial part of the operator's implementation.


public static <T> Flow.Publisher<T> skip(
        Flow.Publisher<T> source, long n) {

    return new SkipPublisher<>(source, n);
}


The operator's Flow.Subscriber uses similar counting method as take(), decrementing a remaining field and once it reaches zero, all subsequent events are forwarded.

static final class SkipSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    SkipSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
}


The onError/onComplete methods can now emit the respective event directly without the need to check the remaining count. May the sequence be shorter, equal or longer than the provided limit, the terminal events can be emitted as onNext is there to skip items, not stop the flow.

In general, when an operator drops items, it has the responsibility to ask for more from the upstream as the downstream has no way of knowing it didn't receive an item (other than timing out). Thus, it doesn't know it has to request more and has to be requested on behalf by the operator itself. However, requesting one by one is expensive - 1 CAS + 0..2 atomic increments per invocation - which can be avoided by requesting in batches.

The skip() operator is in a particular position where we know the first N items will be dropped, thus we can simply request N on top of what the downstream requested. onNext will drop the first N and even if the downstream hasn't requested, it won't get overflown.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;

        long n = remaining;
        downstream.onSubscribe(s);
        s.request(n);
    }


The reason remaining is read before sending the Flow.Subscription to the downstream is that this call may result in value emission which decrements remaining and we'd end up with less than the amount to be skipped. This also saves a field storing the logically immutable skip amount.

The role of onNext is to drop items and then let the rest pass through:

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r == 0L) {
            downstream.onNext(item);
        } else {
            remaining = r - 1;
        }
    }


Skipping while a condition holds


Similar to a conditional take, we can skip unwanted items that match a condition (predicate) and the let the rest through unconditionally. Unfortunately, this case requires per item replenishment from the upstream if the condition doesn't hold as we can't be sure which upstream item will yield a true value and switch the operator into a pass-through mode.


public static <T> Flow.Publisher<T> skipWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new SkipWhilePublisher<>(source, predicate);
}

static final class SkipWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean passthrough;

    boolean done;

    SkipWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe becomes a direct pass-through for the Flow.Subscription and the onError/onComplete methods get their done checks back. This is required because the predicate may fail for the very last item and the cancellation may not stop the terminal event. There is also the need for a flag that tells the onNext to let all items through and skip the predicate altogether.




    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        if (!passthrough) {
            boolean b;
            try {
                b = predicate.test(item);
            } catch (Throwable ex) {
                upstream.cancel();
                done = true;
                downstream.onError(ex);
                return;
            }
            if (b) {
                upstream.request(1);
                return;
            }
            passthrough = true;           
        }

        downstream.onNext(item);
    }


First we prevent an excess onNext if there was a crash in the predicate before. Next, if the operator is not in the pass-through mode, we test with the predicate. If this turns out to be true, that indicates the item has to be dropped and a fresh item has to be requested as replenishment. Otherwise the operator enters its pass-through mode and the current and subsequent items will be emitted directly without invoking the predicate again. Implementing an "until" variant, where the predicate returning true still drops the current item, is left to the reader as an exercise.


Conclusion

The post demonstrated a way to implement the take, takeWhile, takeUntil, skip and skipWhile operators. The logic behind them doesn't require a fully fledged queue-drain approach and for being synchronous in-flow operators, don't really have to bother with concurrency and request management (except the two take alternatives which do have to manage requests).

The fact that the onXXX methods of a Flow.Subscriber are not allowed to be invoked concurrently greatly helps reducing the complexity and the need for being defensive all the time.

In the next post, we'll see how a multicasting Processor can be implemented and how a fan-out-fan-in operator can be implemented with its help.

2017. szeptember 27., szerda

Java 9 Flow API: arbitration and concatenation

Introduction


A very common task is to combine multiple sources, or more generally, start consuming a source once the previous source has terminated. The naive approach would be to simply call otherSource.subscribe(nextSubscriber) from onError or onComplete. Unfortunately, this doesn't work for two reasons: 1) it may end up with deep stacks due to a "tail" subscription from onError/onComplete and 2) we should request the remaining, unfulfilled amount from the new source that hasn't be provided by the previous source to not overflow the downstream.

The first issue can be solved by applying a heavyweight observeOn in general and implementing a basic trampolining loop only for certain concrete cases such as flow concatenation to be described in this post.

The second issue requires a more involved source: not only do we have to switch between Flow.Subscriptions from different sources, we have to make sure concurrent request() invocations are not lost and are routed to the proper Flow.Subscription along with any concurrent cancel() calls. Perhaps the difficulty is lessened by the fact that switching sources happens on a terminal event boundary only, thus we don't have to worry about the old source calling onNext while the logic switches to the new source and complicating the accounting of requested/emitted item counts. Enter SubscriptionArbiter.


Subscription arbitration


We have to deal with 4 types of potentially concurrent signals when arbitrating Flow.Subscriptions:


  1. A request(long) call from downstream that has to be routed to the current Flow.Subscription
  2. A cancel() call from downstream that has to be routed to the current Flow.Subscription and cancel any future Flow.Subscription.
  3. A setSubscription(Flow.Subscription) that is called by the current Flow.Subscriber after subscribing to any Flow.Publisher which is not guaranteed to happen on the same thread subscribe() is called (i.e., as with the standard SubmissionPublisher or our range() operator).
  4. A setProduced(long n) that is called when the previous source terminates and we want to make sure the new source will be requested the right amount; i.e., we'll have to deduce this amount from the current requested amount so setSubscription will issue the request for the remainder to the new Flow.Subscription.


Let's start with the skeleton of the SubscriptionArbiter class providing these methods:


public class SubscriptionArbiter implements Flow.Subscription {

    Flow.Subscription current;
    static final VarHandle CURRENT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "current", Flow.Subscription.class);

    Flow.Subscription next;
    static final VarHandle NEXT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "next", Flow.Subscription.class);

    long requested;

    long downstreamRequested;
    static final VarHandle DOWNSTREAM_REQUESTED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "downstreamRequested", Flow.Subscription.class);
    
    long produced;
    static final VarHandle PRODUCED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "produced", Flow.Subscription.class);

    long wip;
    static final VarHandle WIP =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "wip", int.class);

    @Override
    public final void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    public final boolean isCancelled() {
        // TODO implement
        return false;
    }

    public final void setSubscription(Flow.Subscription s) {
        // TODO implement
    }

    public final void setProduced(long n) {
        // TODO implement
    }

    final void arbiterDrain() {
        // TODO implement
    }
}

We intend the class to be extended to save on allocation and object headers, however, some methods should not be overridden by any subclass as it would likely break the internal logic. The only relatively safe overridable method is cancel(): the subclass will likely have its own resources that have to be released upon a cancel() call from the downstream which will receive an instance of this class via onSubscribe. The meaning of each field is as follows:


  • current holds the current Flow.Subscription. Its companion CURRENT VarHandle is there to support cancellation.
  • next temporarily holds the next Flow.Subscription to replace current instance. Direct replacement can't work due to the required request accounting.
  • requested holds the current outstanding request count. It doesn't have any VarHandle because it will be only accessed from within a drain-loop.
  • downstreamRequested accumulates the downstream's requests in case there the drain loop is executing.
  • produced holds the number of items produced by the previous source which has to be deduced from requested before switching to the next source happens. It is accompanied by a VarHandle to ensure proper visibility of its value from within the drain loop.
  • wip is our standard work-in-progress counter to support the queue-drain like lock-free serialization we use almost everywhere now.

The first method we implement is request() that will be called by the downstream from an arbitrary thread at any time:


    @Override
    public final void request(long n) {
        for (;;) {
            long r = (long)DOWNSTREAM_REQUESTED.getAcquire(this);
            long u = r + n;
            if (u < 0L) {
                u = Long.MAX_VALUE;
            }
            if (DOWNSTREAM_REQUESTED.compareAndSet(this, r, u)) {
                arbiterDrain();
                break;
            }
        }
    }


We perform the usual atomic addition capped at Long.MAX_VALUE and call arbiterDrain().

    @Override
    public void cancel() {
        Flow.Subscription s = (Flow.Subscription)CURRENT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
        s = (Flow.Subscription)NEXT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
    }

    public final boolean isCancelled() {
        return CURRENT.getAcquire(this) == this;
    }


We atomically swap in both the current and the next Flow.Subscription instances with the cancelled indicator of this. To support some eagerness in cancellation, the isCancelled can be called by the subclass (i.e., concat an array of Flow.Publishers) to quit its trampolined looping.

Next, we "queue up" the next Flow.Subscription:


    public final void setSubscription(Flow.Subscription subscription) {
        if (NEXT.compareAndSet(this, null, subscription)) {
            arbiterDrain();
        } else {
            subscription.cancel();
        }
    }


We expect there will be only one thread calling setSubscription and that call happens before the termination of the associated source, thus a simple CAS from null to subscription should be enough. In this scenario, a failed CAS can only mean the arbiter was cancelled in the meantime and we cancel the subscription accordingly. We'll still have to relay the unfulfilled request amount to this new subscription which will be done in arbiterDrain().

The setProducer will have to "queue up" the fulfilled amount in a similar fashion:


    public final void setProduced(long n) {
        PRODUCED.setRelease(this, n);
        arbiterDrain();
    }


As with the setSubscription, we expect this to happen once per a terminated source before the subscription to the next source happens, thus there is no real need to atomically accumulate the item count.

Finally, let's see the heavy lifting in arbiterDrain() itself now:


    final void arbiterDrain() {
        if ((int)WIP.getAndAdd(this, 1) != 0) {
            return;
        }
        
        Flow.Subscription requestFrom = null;
        long requestAmount = 0L;

        for (;;) {

            // TODO implement

            if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
                break;
            }
        }

        if (requestFrom != null && requestFrom != this
                && requestAmount != 0L) {
            requestFrom.request(requestAmount);
        }
    }


The arbiterDrain(), whose name was chosen to avoid clashing with a subclass' drain() method if any, method starts out as most typical trampolined drain loop did: the atomic increment to wip from 0 to 1 enters the loop and the decrement to zero leaves the loop.

One oddity may come from the requestFrom and requestAmount local variables. Unlike a traditional stable-prefetch queue-drain, requesting from within the loop can bring back the reentrancy issue, the tail-subscription problem and may prevent other actions from happening with the arbiter until the request() call returns. Therefore, once the loop decided what the current target Flow.Subscription is, we'll issue a request to it outside the loop. It is possible by the time the drain method reaches the last if statement that the current requestFrom is outdated or the arbiter was cancelled. This is not a problem because request() and cancel() in general are expected to race and an outdated Flow.Subscription means it has already terminated and a request() call is a no-op to it.

The last part inside the loop has to "dequeue" the deferred changes and apply them to the state of the arbiter:


    for (;;) {

        // (1) ----------------------------------------------
        Flow.Subscription currentSub = (Flow.Subscription)CURRENT.getAcquire(this);
        if (currentSub != this) {

            // (2) ------------------------------------------
            long req = requested;

            long downstreamReq = (long)DOWNSTREAM_REQUESTED.getAndSet(this, 0L);

            long prod = (long)PRODUCED.getAndSet(this, 0L);

            Flow.Subscription nextSub = (Flow.Subscription)NEXT.getAcquire(this, null);
            if (nexSub != null && nextSub != this) {
                NEXT.compareAndSet(this, nextSub, null);
            }

            // (3) ------------------------------------------
            if (downstreamReq != 0L) {
                req += downstreamReq;
                if (req < 0L) {
                    req = Long.MAX_VALUE;
                }
            }

            // (4) ------------------------------------------
            if (prod != 0L && req != Long.MAX_VALUE) {
                req -= prod;
            }

            requested = req;

            // (5) ------------------------------------------
            if (nextSub != null && nextSub != this) {
                requestFrom = nextSub;
                requestAmount = req;
                CURRENT.compareAndSet(currentSub, nextSub);
            } else {
                // (6) --------------------------------------
                requestFrom = currentSub;
                requestAmount += downstreamReq;
                if (requestAmount < 0L) {
                    requestAmount = Long.MAX_VALUE;
                }
            }
        }

        if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
            break;
        }
    }



  1. First we check if the current instance holds the cancelled indicator (this). If so, we don't have to execute any of the logic as the arbiter has been cancelled by the downstream.
  2. We read out the current and queued state: the current outstanding requested amount, the request amount from the downstream if any, the produced item count by the previous source and the potential next Flow.Subscription instance. While it is safe to atomically swap in 0 for both the downstreamRequested and produced values, swapping in null unconditionally may overwrite the cancelled indicator and the setSubscription won't cancel its argument.
  3. If there was an asynchronous request() call, we add the downstreamReq amount to the current requested amount, capped at Long.MAX_VALUE (unbounded indicator).
  4. If there was a non-zero produced amount and the requested amount isn't Long.MAX_VALUE, we subtract the two. The new requested amount is then saved.
  5. If there was a new Flow.Subscription set via setSubscription, we indicate where to request from outside the loop and we indicate the whole current requested amount (now including any async downstream request and upstream produced count) should be used. The CAS will make sure the next Flow.Subscription only becomes the current one if there was no cancellation in the meantime.
  6. Otherwise, we target the current Flow.Subscription, add up the downstream's extra requests capped at Long.MAX_VALUE. The reason for this is that the downstream may issue multiple requests (r1, r2) in a quick succession which makes the logic to loop back again, now having r1 + r2 items outstanding from the downstream's perspective.

Now that the infrastructure is ready, let's implement a couple of operators.


Concatenating an array of Flow.Publishers


Perhaps the simplest operator we could write on top of the SubscriptionArbiter is the concat() operator. It consumes one Flow.Publisher after another in a non-overlapping fashion until all of them have completed.


@SafeVarargs
public static <T> Flow.Publisher<T> concat(Flow.Publisher<? extends T>... sources) {
    return new ConcatPublisher<>(sources);
}


The ConcatPublisher itself is straightforward: create a coordinator, send it to the downstream and trigger the consumption of the first source:


     @Override
     public void subscribe(Flow.Subscriber<? super T> s) {
         ConcatCoordinator<T> parent = new ConcatCoordinator<>(s, sources);
         s.onSubscribe(parent);
         parent.drain();
     }

The ConcatCoordinator can be implemented as follows:


static final class ConcatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
    
    final Flow.Subscription<? super T> downstream;

    final Flow.Publisher<? extends T>[] sources;

    int index;

    int trampoline;
    static final VarHandle TRAMPOLINE =
        VH.find(MethodHandles.lookup(), ConcatCoordinator.class,
            "trampoline", int.class);

    long consumed;
    
    ConcatCoordinator(
           Flow.Subscription<? super T> downstream,
           Flow.Publisher<? extends T>[] sources
    ) {
        this.downstream = downstream;
        this.sources = sources;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }
}


The ConcatCoordinator extends the SubscriptionArbiter, thus it is a Flow.Subscription as well and as such will be used as the connection object towards the downstream. It also implements Flow.Subscription because we'll use the same instance to subscribe to all of the Flow.Publishers one after the other.

One may come up with the objection that reusing the same Flow.Subscriber instance is not allowed by the Reactive-Streams specification the Flow API inherited. However, the specification actually just discourages the reuse and otherwise mandates external synchronization so that the onXXX methods are invoked in a serialized manner. We'll see that the trampolining in the operator will just ensure that property along with the arbiter itself. Of course, we could just new up a Flow.Subscriber for the next source but that Flow.Subscriber would be itself nothing more than a delegator for the coordinator instance (no need for a per-source state in it); combining the two just saves on allocation and indirection.

The fields are interpreted as follows:


  • downstream is the Flow.Subscriber that receives the signals.
  • sources is the array of Flow.Publishers that will be consumed one after the other
  • index points at the current Flow.Publisher and gets incremented once one completes.
  • trampoline is the work-in-progress indicator for the drain loop; chosen to avoid clashing with the arbiter's own wip field in this blog for better readability. In practice, since they are in different classes, one can name them both wip.
  • consumed tracks how many items the current source has produced (and has the coordinator consumed). We'll use this to update the arbiter at the completion of the current source instead of doing it for each item received because that saves a lot of overhead and we don't really care about each individual item's reception.


The coordinator's onXXX methods are relatively trivial at this point:


   @Override
    public void onSubscribe(Flow.Subscription s) {
        setSubscription(s);
    }

    @Override
    public void onNext(T item) {
        consumed++;
        downstream.onNext(item);
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        drain();
    }
   


We save the Flow.Subscription into the arbiter, write through the item or throwable and call the drain method upon normal completion.

What's left is the drain() method itself:


    void drain() {
        // (1) -------------------------------------------------------
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                // (2) -----------------------------------------------
                if (isCancelled()) {
                    return;
                }

                // (3) -----------------------------------------------
                if (index == sources.length) {
                    downstream.onComplete();
                    return;
                }

                // (4) -----------------------------------------------
                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                // (5) -----------------------------------------------
                sources[index++].subscribe(this);

            // (6) ---------------------------------------------------
            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }

Again, not really a complicated method, but as usual, the difficulty may come from understanding why such short code is actually providing the required behavior and safeguards:


  1. We know that drain() is only invoked from the subscribe() or onComplete() methods. This standard lock-free trampolining check ensures only one thread is busy setting up the consumption of the next (or the first) source. In addition, since only a guaranteed one-time per source onComplete() can trigger the consumption of the next, we don't have to worry about racing with onNext in this operator. (However, an in-flow concatMap is a different scenario.) This setup also defends against increasing the stack depth due to tail-subscription: a trampoline > 1 indicates we can immediately subscribe to the next source.
  2. In case the downstream cancelled the operator, we simply quit the loop.
  3. In case the index is equal to the number of sources, it means we reached the end of the concatenation and can complete the downstream via onComplete().
  4. Otherwise, we indicate to the arbiter the number of items consumed from the previous source so it can update its outstanding (current) request amount. Note that consumed is not concurrently updated because onNext and onComplete (and thus drain) on the same source can't be executed concurrently.
  5. We then subscribe to the next source, move the index forward by one to point to the next-next source and subscribe with this.
  6. Finally if there was no synchronous or racy onComplete, we quit the loop, otherwise we resume with the subsequent sources.


One can add a few features and safeguards to this coordinator, such as delaying errors till the very end and ensuring the indexth sources entry is not null. These are left as exercise to the reader.


Repeat


How can we turn this into a repeat operator where the source is resubscribed on successful completion? Easy: drop the index and have only a single source Flow.Publisher to be worked on!


public static <T> Flow.Publisher<T> repeat(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RepeatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onXXX methods are the same

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                if (--max < 0L) {
                    downstream.onComplete();
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


Given that repeating indefinitely is usually not desired, we limit the resubscriptions to a number of times specified by the user. Since there is only one source Flow.Publisher, no indexing into an array is needed and we only have to decrement the counter to detect the condition for completing the downstream.


Retry


How about retrying a Flow.Publisher in case it failed with an onError? Easy: have onError call drain() and onComplete call downstream.onComplete() straight:


public static <T> Flow.Publisher<T> retry(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (--max < 0L) {
            downstream.onError(throwable);
        } else {
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


There are two slight changes in retry(). First, in case we run out of the retry count, the latest Flow.Publisher's error is delivered to the downstream from within onError and no further retry can happen. Consequently, the drain logic no longer should call onComplete when the number of allowed retries have been used up.


On error, resume with another Flow.Publisher


Now that we've seen multi-source switchover and single-source continuation, switching to an alternative or "fallback" Flow.Publisher should be straightforward to set up: have a 2 element array with the main and fallback Flow.Publishers, then make sure onError triggers the switch.


public static <T> Flow.Publisher<T> onErrorResumeNext(
        Flow.Publisher<T> source, Flow.Publisher<T> fallback) {
    return new RepeatPublisher<>(source, fallback);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    final Flow.Publisher<T> fallback;

    boolean switched;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (switched) {
            downstream.onError(throwable);
        } else {
            switched = true;
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                if (switched) {
                    fallback.subscribe(this);
                } else {
                    source.subscribe(this);
                }

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}

Here, we have two states, switched == false indicates we are consuming the main source. If that fails, we set switched = true and the drain loop will subscribe to the fallback Flow.Publisher. However, if the fallback fails, the onError also checks for switched == true and instead of draining (and thus retrying) the fallback Flow.Publisher again, it just terminates with the Throwable the fallback emitted.


Conclusion


In this post, the subscription arbitration concept was presented which allows us to switch between non-overlapping Flow.Publisher sources when one terminates (completes or fails with an error) while maintaining the link of cancellation between the individual Flow.Subscriptions as well as making sure backpressure is properly transmitted and preserved between them.

When combining with a trampolining logic, such arbitration allowed us to implement a couple of standard ReactiveX operators such as concat, repeat, retry and onErrorResumeNext while only applying small changes to the methods and algorithms in them.

Note however, that even if the arbiter can be reused for in-flow operators such as concatMap (concatenate Flow.Publishers generated from upstream values), repeatWhen (repeat if a companion Flow.Publisher signals an item) and retryWhen, one can no longer use a single Flow.Subscriber to subscribe to both the main flow and the inner/companion flows at the same time. We will explore these types of operators in a later post.

The arbitration has its own limit: it can't support live switching between sources, i.e., when onNext may be in progress while the switch has to happen. If you are familiar with the switchMap operator, this is what can happen during its execution. We'll look into this type of operator in a subsequent post.

But for now, we'll investigate a much lighter set of operators in the next post: limiting the number of items the downstream can receive and skipping certain number of items from the upstream; both based on counting items and based on a per-item predicate checks, i.e., the take() and skip() operators.

2017. szeptember 25., hétfő

Java 9 Flow API: timing out events

Introduction


One of the main properties of reactive programming is that the events may arrive over time instead of immediately available to a consumer. In traditional Future-based programming, one could wait for the result in a blocking manner via Future.get(long, TimeUnit). Other data sources, such as network InputStream have either their own built-in timeout facility or one has to use external means to close the stream after certain period of time to unblock the reader to it. Java 8 Streams have also no direct timeout support.

In the reactive mindset, one can consider timing out events (items) as requesting an element and racing its arrival against the clock. If the item arrives in time, we should ignore the clock. If the clock fires first, we should stop the sender of the items and somehow notify the consumer of the situation. Perhaps the simplest way is to signal onError with a TimeoutException. Since there could be multiple items from a flow, we have to do this racing for each potential items over and over until the flow terminates.


The timeout operator


Since there is "time" in timeout, we'll need a source of time that can be started and stopped at will. The first tool that comes into mind is the java.util.Timer class, however, even its Javadoc suggest one uses a ScheduledExecutorService instead. If one has to deal with a lot of timed operations, besides of timing out flows, having the control over such signals via a (set of) ScheduledExecutorServices is desirable. Therefore, let's define our timeout API with it:


public static <T> Flow.Publisher<T> timeout(
        Flow.Publisher<T> source,
        long timeout, TimeUnit unit,
        ScheduledExecutorService timer) {

    return new TimeoutPublisher<>(source, timeout, unit, timer);
}


(Note that if one uses the Executors.newScheduledExecutorService(), it has to be shutdown at some point, otherwise it's non-daemon thread by default would prevent the JVM from quitting.)

One primary responsibility of this type of operator is to make sure the downstream's onXXX methods are called in sequence and non-overlapping. However, a timeout may happen at the same time the main source signals an event which could lead to call to onNext and onError concurrently - violating the Flow protocol.

The natural non-blocking solution could be the use of a serialization primitive described some time ago in this blog, but we could go more lean on such behavior due to the special nature of this operator.

Thinking about the possible states of such operator, there are two state transitions we should consider: receiving and emitting the next upstream signal, receiving and emitting an error for the timeout signal. In concurrency land, this means using a state variable and compare-and-set to transition from state to state in an atomic fashion.

There is a small problem though: how does a timeout know it happened after the right item didn't arrive? Relying on the accuracy of Future.cancel() for this might not be the best option. Luckily, there is a way to have both proper mutual exclusion and serialization: by using a long field index to track the index of the upstream item as well as indicate a terminal state via Long.MAX_VALUE - an index unlikely reached.

The idea is as follows: a regular onNext signal from the upstream increments this long field unless it is Long.MAX_VALUE at which point the event is ignored. An upstream onError or onComplete will atomically swap in the Long.MAX_VALUE and if not already at Long.MAX_VALUE, emit the respective signal to the downstream. At the same time, a thread executing the Runnable.run() for on the timeout side will try to swap in the last event's index with a Long.MAX_VALUE too. If the current index hasn't changed during the wait (no onXXX signal from the main source), the atomic swap will ensure that any subsequent onXXX call will ignore its event, thus only one thread at a time will emit a signal to the downstream. This may sound complicated when written in plaintext but the code should look much clearer in a couple of paragraphs.

When doing the parent Flow.Publisher implementation of TimeoutPublisher, one has to consider a peculiar case. When should the timeout for the very first item start? Remember, we have to call onSubscribe on the downstream to provide it with the ability to request and cancel the flow. If the timeout would start after downstream.onSubscribe(), a synchronous and blocking source responding to the downstream's request at that point may not give back control at all, rendering the timeout operator nonoperational. If the timeout started before the call to downstream.onSubscribe(), we risk emitting the TimeoutException before or while the downstream.onSubscribe() is executing. Since we also have to intercept a cancel() call from downstream to stop an ongoing timeout, we have to call downstream.onSubscribe() from within the Flow.Publisher.subscribe() method before we subscribe to the upstream or even start the timeout for the very first item:


    @Override
    public void subscribe(Subscriber<? super T> s) {
        TimeoutSubscriber<T> parent = new TimeoutSubscriber<>(
            s, timeout, unit, timer);

        s.onSubscribe(parent);

        parent.startTimeout(0L);

        source.subscribe(parent);
    }


We create the in-sequence Flow.Subscriber first (which implements Flow.Subscription), send it to the downstream, start the timeout for the first item (index is zero at this point) and we subscribe to the upstream source.

The next step is to write up the skeleton of the TimeoutSubscriber:


static final class TimeoutSubscriber<T> implements
Flow.Subscriber<T>, Flow.Subscription {
   
    final Flow.Subscriber<? super T> downstream;

    final long timeout;

    final TimeUnit unit;

    final ScheduledExecutorService timer;

    Flow.Subscription upstream;
    static final VarHandle UPSTREAM =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "upstream", Flow.Subscription.class);

    long requested;
    static final VarHandle REQUESTED =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "requested", long.class);

    long index;
    static final VarHandle INDEX =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "index", long.class);

    Future<?> task;
    static final VarHandle TASK =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "task", Future.class);

    static final Future<Object> CANCELLED_TASK = 
        new FutureTask<>(() -> { }, null);
    
    TimeoutSubscriber(
            Flow.Subscriber<? super T> downstream,
            long timeout,
            TimeUnit unit,
            ScheduledExecutorService timer) {
        this.downstream = downstream;
        this.timeout = timeout;
        this.unit = unit;
        this.timer = timer;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
   
    @Override
    public void request(long n) {
        // TODO implement
    }
    
    @Override
    public void cancel() {
        // TODO implement
    }

    void run(long idx) {
        // TODO implement
    }

    void startTimeout(long idx) {
        // TODO implement
    }
}

The fields can be described as follows:


  • downstream is the Flow.Subscriber we'd like to signal events to.
  • timeout and unit have to be stored as a new timeout task will have to be started after each in-time event.
  • timer is the ScheduledExecutorService we will submit tasks delayed by the timeout and unit.
  • upstream will hold the source's Flow.Subscription. Its VarHandle UPSTREAM will allow us to defer any cancellation coming from the downstream until there is an instance of it from the upstream. In addition, we have to defer requesting from downstream due to the same reason as with cancellation: since we call the downstream.onSubscribe before subscribing to the upstream, the downstream may issue cancel and/or request to a not-yet available Flow.Subscription.
  • requested will hold onto any downstream requests until there is an upstream Flow.Subscription available to the operator. This is part of the so-called deferred requesting pattern.
  • index holds the state that indicates if the operator has reached its terminal state naturally or due to timeout.
  • task holds the current Future for the timeout task submitted to the timer ScheduledExecutorService or the CANCELLED_TASK indicator if the operator has been cancelled concurrently to its operation and no further scheduling of any tasks should happen. Since only one such task has to run at a time, the actual parallelism level of the ScheduledExecutorService doesn't matter. The value has to be changed atomically, hence the TASK VarHandle after it.

The implementation of each method, although relatively short, is a bit more involved as they are more involved in direct, inlined atomic state changes. The onSubscribe method has to provide the deferred cancellation and deferred requesting behavior:


    @Override
    public void onSubscribe(Flow.Subscription s) {
        if (UPSTREAM.compareAndSet(null, s)) {
            long r = (long)REQUESTED.getAndSet(this, 0L);
            if (r != 0L) {
                s.request(r);
            }
        } else {
            s.cancel();
        }
    }


We atomically try to set in the upstream's Flow.Subscription and if successful, we atomically take the accumulated request amount from downstream. If there was any, we request it from upstream. Otherwise, a non-null upstream just means the operator has been cancelled and we cancel the upstream as well.

Before we look into the other onXXX methods, let's see the partners of onSubscribe to show the other sides of the deferred requesting and cancellation logic:

    @Override
    public void request(long n) {
        Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAcquire(this);
        if (current != null) {
            current.request(n);
        } else {
            for (;;) {
                long r = (long)REQUESTED.getAcquire(this);
                long u = r + n;
                if (u < 0L) {
                    u = Long.MAX_VALUE;
                }
                if (REQUESTED.compareAndSet(this, r, u)) {
                    break;
                }
            }

            current = (Flow.Subscription)UPSTREAM.getAcquire(this);
            if (current != null && current != this) {
                long r = (long)REQUESTED.getAndSet(this, 0L);
                if (r != 0L) {
                    current.request(r);
                }
            }
        }
    }

When the downstream calls request(), we could be in two states: the upstream Flow.Subscription is available, in which case we simply forward the request amount to it; or the upstream Flow.Subscription is not available and we have to temporarily accumulate downstream requests (which could happen on any thread and any time). This happens via our standard bounded atomic add operation. Once this succeeds, the upstream may have just called onSubscribe with a valid Flow.Subscription (the value of this indicates cancel() has been invoked, see down below). In that case, we atomically take all the accumulated requests, swapping in a zero in exchange, and if that value was non-zero, we issue the request to this fresh upstream. Concurrent interleaving will find requested zero or non-zero and issue any excess request amount accordingly.


    @Override
    public void cancel() {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAndSet(this, this);
            if (current != null && current != this) {
                current.cancel();
            }

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }
        }
    }


First, we atomically swap in the terminal index Long.MAX_VALUE, locking out both the onXXX methods and the timeout task. Then, we atomically swap in the cancelled Flow.Subscription indicator (this) and cancel any available upstream. The same thing has to happen to the timeout task.

Now let's see the onNext implementation:

    @Override
    public void onNext(T item) {

        long idx = (long)INDEX.getAcquire(this);
        if (idx == Long.MAX_VALUE) {
            return;
        }
        if (!INDEX.compareAndSet(this, idx, idx + 1)) {
            return;
        }

        Future<?> future = (Future<?>)TASK.getAcqurie(this);
        if (future == CANCELLED_TASK) {
            return;
        }
        if (future != null) {
            future.cancel(false);
        }

        downstream.onNext(item);

        startTimeout(idx + 1);
    }


First we get the current index and if already at Long.MAX_VALUE, we quit. Next, we try to atomically update the index to the next value and if that fails (due to cancellation or a racy timeout), we quit as well. Once the index has been updated successfully, we cancel the ongoing timeout task, emit the current item and start a new task with the subsequent index value.

The onError and onComplete methods are relatively similar by cancelling the timeout task and signalling the appropriate terminal event:


    @Override
    public void onError(Throwable throwable) {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }

            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }

            downstream.onComplete();
        }
    }


The are pretty similar to how cancel() is implemented: we atomically swap in the terminal index Long.MAX_VALUE and cancel the timeout task. Note that cancelling the upstream is not necessary at this point as it is considered cancelled in accordance with the Flow (Reactive-Streams) specification.

    void run(long idx) {
        if (INDEX.compareAndSet(this, idx, Long.MAX_VALUE)) {

            Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAndSet(this, this);
            if (current != null && current != this) {
                current.cancel();
            }

            downstream.onError(new TimeoutException());
        }
    }


Given the last known item index, we atomically try to swap in the terminal index indicator and if successful, we cancel the upstream (directly or in a deferred manner) followed by the signal of the TimeoutException. Since the task executing will end either way, there is no need or reason to cancel the Future tracking the timeout task itself. If the index has been changed (either due to the arrival of an onNext item or cancellation), the timeout task will do nothing.

Finally, let's see how the timeout tasks are scheduled:


    void startTimeout(long idx) {
        Future<?> old = (Future<?>)TASK.getAcquire(this);
        if (old != CANCELLED_TASK) { 
            Future<?> future = timer.schedule(() -> run(idx), timeout, unit);
            if (!TASK.compareAndSet(this, old, future)) {
                future.cancel(false);
            }
        }
    }


First, we get the previous, old task instance so we can conditionally swap in the new task if there was no cancellation in between the two. Then, we schedule the execution of the run() method with the current item index (provided by the subscribe() or the onNext() method). If the actual index doesn't change until the run() executes, it will trigger the timeout logic. Then, we atomically try to replace the old Future with the new one and if that fails (due to cancellation), we cancel the new Future task too.


Conclusion


As demonstrated in this post, writing a basic timeout operator - which just signals a TimeoutException, can be done with a relatively few lines. The complication comes from undestanding why the atomic index changes are actually correct in various race scenarios and how it also ensures proper event serialization towards the downstream.

The same pattern can be used for writing a set of other operators which uses one-shot signal to "interrupt" a main sequence, for example, takeUntil(Flow.Publisher), where in the latter, the Future tracking is replaced by tracking the Flow.Subscription of the other Flow.Publisher.

There is, however, the natural need for doing something else than signalling a TimeoutException: switching to another Flow.Publisher for example. One would think, let's subscribe the TimeoutSubscriber to that fallback Flow.Publisher! Unfortunately, it doesn't work, partly because we'd probably didn't want to timeout the fallback Flow.Publisher's consumption (and even if), partly because we have to make sure the switch over preserves the number of outstanding downstream requests.

The deferred requesting logic in the operator shown in the post is not suitable for such transitions and it requires a peculiar Flow.Subscription management logic I call Subscription Arbitration. It enables a whole suit of other operators to work across sources, such as typical RxJava operators concat(Map), repeat(When), retry(When), timeout with fallback and onErrorResumeNext. In the next blog post, we'll see how the arbitration logic and most of these operators can be implemented.