2017. szeptember 27., szerda

Java 9 Flow API: arbitration and concatenation

Introduction


A very common task is to combine multiple sources, or more generally, start consuming a source once the previous source has terminated. The naive approach would be to simply call otherSource.subscribe(nextSubscriber) from onError or onComplete. Unfortunately, this doesn't work for two reasons: 1) it may end up with deep stacks due to a "tail" subscription from onError/onComplete and 2) we should request the remaining, unfulfilled amount from the new source that hasn't be provided by the previous source to not overflow the downstream.

The first issue can be solved by applying a heavyweight observeOn in general and implementing a basic trampolining loop only for certain concrete cases such as flow concatenation to be described in this post.

The second issue requires a more involved source: not only do we have to switch between Flow.Subscriptions from different sources, we have to make sure concurrent request() invocations are not lost and are routed to the proper Flow.Subscription along with any concurrent cancel() calls. Perhaps the difficulty is lessened by the fact that switching sources happens on a terminal event boundary only, thus we don't have to worry about the old source calling onNext while the logic switches to the new source and complicating the accounting of requested/emitted item counts. Enter SubscriptionArbiter.


Subscription arbitration


We have to deal with 4 types of potentially concurrent signals when arbitrating Flow.Subscriptions:


  1. A request(long) call from downstream that has to be routed to the current Flow.Subscription
  2. A cancel() call from downstream that has to be routed to the current Flow.Subscription and cancel any future Flow.Subscription.
  3. A setSubscription(Flow.Subscription) that is called by the current Flow.Subscriber after subscribing to any Flow.Publisher which is not guaranteed to happen on the same thread subscribe() is called (i.e., as with the standard SubmissionPublisher or our range() operator).
  4. A setProduced(long n) that is called when the previous source terminates and we want to make sure the new source will be requested the right amount; i.e., we'll have to deduce this amount from the current requested amount so setSubscription will issue the request for the remainder to the new Flow.Subscription.


Let's start with the skeleton of the SubscriptionArbiter class providing these methods:


public class SubscriptionArbiter implements Flow.Subscription {

    Flow.Subscription current;
    static final VarHandle CURRENT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "current", Flow.Subscription.class);

    Flow.Subscription next;
    static final VarHandle NEXT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "next", Flow.Subscription.class);

    long requested;

    long downstreamRequested;
    static final VarHandle DOWNSTREAM_REQUESTED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "downstreamRequested", Flow.Subscription.class);
    
    long produced;
    static final VarHandle PRODUCED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "produced", Flow.Subscription.class);

    long wip;
    static final VarHandle WIP =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "wip", int.class);

    @Override
    public final void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    public final boolean isCancelled() {
        // TODO implement
        return false;
    }

    public final void setSubscription(Flow.Subscription s) {
        // TODO implement
    }

    public final void setProduced(long n) {
        // TODO implement
    }

    final void arbiterDrain() {
        // TODO implement
    }
}

We intend the class to be extended to save on allocation and object headers, however, some methods should not be overridden by any subclass as it would likely break the internal logic. The only relatively safe overridable method is cancel(): the subclass will likely have its own resources that have to be released upon a cancel() call from the downstream which will receive an instance of this class via onSubscribe. The meaning of each field is as follows:


  • current holds the current Flow.Subscription. Its companion CURRENT VarHandle is there to support cancellation.
  • next temporarily holds the next Flow.Subscription to replace current instance. Direct replacement can't work due to the required request accounting.
  • requested holds the current outstanding request count. It doesn't have any VarHandle because it will be only accessed from within a drain-loop.
  • downstreamRequested accumulates the downstream's requests in case there the drain loop is executing.
  • produced holds the number of items produced by the previous source which has to be deduced from requested before switching to the next source happens. It is accompanied by a VarHandle to ensure proper visibility of its value from within the drain loop.
  • wip is our standard work-in-progress counter to support the queue-drain like lock-free serialization we use almost everywhere now.

The first method we implement is request() that will be called by the downstream from an arbitrary thread at any time:


    @Override
    public final void request(long n) {
        for (;;) {
            long r = (long)DOWNSTREAM_REQUESTED.getAcquire(this);
            long u = r + n;
            if (u < 0L) {
                u = Long.MAX_VALUE;
            }
            if (DOWNSTREAM_REQUESTED.compareAndSet(this, r, u)) {
                arbiterDrain();
                break;
            }
        }
    }


We perform the usual atomic addition capped at Long.MAX_VALUE and call arbiterDrain().

    @Override
    public void cancel() {
        Flow.Subscription s = (Flow.Subscription)CURRENT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
        s = (Flow.Subscription)NEXT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
    }

    public final boolean isCancelled() {
        return CURRENT.getAcquire(this) == this;
    }


We atomically swap in both the current and the next Flow.Subscription instances with the cancelled indicator of this. To support some eagerness in cancellation, the isCancelled can be called by the subclass (i.e., concat an array of Flow.Publishers) to quit its trampolined looping.

Next, we "queue up" the next Flow.Subscription:


    public final void setSubscription(Flow.Subscription subscription) {
        if (NEXT.compareAndSet(this, null, subscription)) {
            arbiterDrain();
        } else {
            subscription.cancel();
        }
    }


We expect there will be only one thread calling setSubscription and that call happens before the termination of the associated source, thus a simple CAS from null to subscription should be enough. In this scenario, a failed CAS can only mean the arbiter was cancelled in the meantime and we cancel the subscription accordingly. We'll still have to relay the unfulfilled request amount to this new subscription which will be done in arbiterDrain().

The setProducer will have to "queue up" the fulfilled amount in a similar fashion:


    public final void setProduced(long n) {
        PRODUCED.setRelease(this, n);
        arbiterDrain();
    }


As with the setSubscription, we expect this to happen once per a terminated source before the subscription to the next source happens, thus there is no real need to atomically accumulate the item count.

Finally, let's see the heavy lifting in arbiterDrain() itself now:


    final void arbiterDrain() {
        if ((int)WIP.getAndAdd(this, 1) != 0) {
            return;
        }
        
        Flow.Subscription requestFrom = null;
        long requestAmount = 0L;

        for (;;) {

            // TODO implement

            if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
                break;
            }
        }

        if (requestFrom != null && requestFrom != this
                && requestAmount != 0L) {
            requestFrom.request(requestAmount);
        }
    }


The arbiterDrain(), whose name was chosen to avoid clashing with a subclass' drain() method if any, method starts out as most typical trampolined drain loop did: the atomic increment to wip from 0 to 1 enters the loop and the decrement to zero leaves the loop.

One oddity may come from the requestFrom and requestAmount local variables. Unlike a traditional stable-prefetch queue-drain, requesting from within the loop can bring back the reentrancy issue, the tail-subscription problem and may prevent other actions from happening with the arbiter until the request() call returns. Therefore, once the loop decided what the current target Flow.Subscription is, we'll issue a request to it outside the loop. It is possible by the time the drain method reaches the last if statement that the current requestFrom is outdated or the arbiter was cancelled. This is not a problem because request() and cancel() in general are expected to race and an outdated Flow.Subscription means it has already terminated and a request() call is a no-op to it.

The last part inside the loop has to "dequeue" the deferred changes and apply them to the state of the arbiter:


    for (;;) {

        // (1) ----------------------------------------------
        Flow.Subscription currentSub = (Flow.Subscription)CURRENT.getAcquire(this);
        if (currentSub != this) {

            // (2) ------------------------------------------
            long req = requested;

            long downstreamReq = (long)DOWNSTREAM_REQUESTED.getAndSet(this, 0L);

            long prod = (long)PRODUCED.getAndSet(this, 0L);

            Flow.Subscription nextSub = (Flow.Subscription)NEXT.getAcquire(this, null);
            if (nexSub != null && nextSub != this) {
                NEXT.compareAndSet(this, nextSub, null);
            }

            // (3) ------------------------------------------
            if (downstreamReq != 0L) {
                req += downstreamReq;
                if (req < 0L) {
                    req = Long.MAX_VALUE;
                }
            }

            // (4) ------------------------------------------
            if (prod != 0L && req != Long.MAX_VALUE) {
                req -= prod;
            }

            requested = req;

            // (5) ------------------------------------------
            if (nextSub != null && nextSub != this) {
                requestFrom = nextSub;
                requestAmount = req;
                CURRENT.compareAndSet(currentSub, nextSub);
            } else {
                // (6) --------------------------------------
                requestFrom = currentSub;
                requestAmount += downstreamReq;
                if (requestAmount < 0L) {
                    requestAmount = Long.MAX_VALUE;
                }
            }
        }

        if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
            break;
        }
    }



  1. First we check if the current instance holds the cancelled indicator (this). If so, we don't have to execute any of the logic as the arbiter has been cancelled by the downstream.
  2. We read out the current and queued state: the current outstanding requested amount, the request amount from the downstream if any, the produced item count by the previous source and the potential next Flow.Subscription instance. While it is safe to atomically swap in 0 for both the downstreamRequested and produced values, swapping in null unconditionally may overwrite the cancelled indicator and the setSubscription won't cancel its argument.
  3. If there was an asynchronous request() call, we add the downstreamReq amount to the current requested amount, capped at Long.MAX_VALUE (unbounded indicator).
  4. If there was a non-zero produced amount and the requested amount isn't Long.MAX_VALUE, we subtract the two. The new requested amount is then saved.
  5. If there was a new Flow.Subscription set via setSubscription, we indicate where to request from outside the loop and we indicate the whole current requested amount (now including any async downstream request and upstream produced count) should be used. The CAS will make sure the next Flow.Subscription only becomes the current one if there was no cancellation in the meantime.
  6. Otherwise, we target the current Flow.Subscription, add up the downstream's extra requests capped at Long.MAX_VALUE. The reason for this is that the downstream may issue multiple requests (r1, r2) in a quick succession which makes the logic to loop back again, now having r1 + r2 items outstanding from the downstream's perspective.

Now that the infrastructure is ready, let's implement a couple of operators.


Concatenating an array of Flow.Publishers


Perhaps the simplest operator we could write on top of the SubscriptionArbiter is the concat() operator. It consumes one Flow.Publisher after another in a non-overlapping fashion until all of them have completed.


@SafeVarargs
public static <T> Flow.Publisher<T> concat(Flow.Publisher<? extends T>... sources) {
    return new ConcatPublisher<>(sources);
}


The ConcatPublisher itself is straightforward: create a coordinator, send it to the downstream and trigger the consumption of the first source:


     @Override
     public void subscribe(Flow.Subscriber<? super T> s) {
         ConcatCoordinator<T> parent = new ConcatCoordinator<>(s, sources);
         s.onSubscribe(parent);
         parent.drain();
     }

The ConcatCoordinator can be implemented as follows:


static final class ConcatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
    
    final Flow.Subscription<? super T> downstream;

    final Flow.Publisher<? extends T>[] sources;

    int index;

    int trampoline;
    static final VarHandle TRAMPOLINE =
        VH.find(MethodHandles.lookup(), ConcatCoordinator.class,
            "trampoline", int.class);

    long consumed;
    
    ConcatCoordinator(
           Flow.Subscription<? super T> downstream,
           Flow.Publisher<? extends T>[] sources
    ) {
        this.downstream = downstream;
        this.sources = sources;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }
}


The ConcatCoordinator extends the SubscriptionArbiter, thus it is a Flow.Subscription as well and as such will be used as the connection object towards the downstream. It also implements Flow.Subscription because we'll use the same instance to subscribe to all of the Flow.Publishers one after the other.

One may come up with the objection that reusing the same Flow.Subscriber instance is not allowed by the Reactive-Streams specification the Flow API inherited. However, the specification actually just discourages the reuse and otherwise mandates external synchronization so that the onXXX methods are invoked in a serialized manner. We'll see that the trampolining in the operator will just ensure that property along with the arbiter itself. Of course, we could just new up a Flow.Subscriber for the next source but that Flow.Subscriber would be itself nothing more than a delegator for the coordinator instance (no need for a per-source state in it); combining the two just saves on allocation and indirection.

The fields are interpreted as follows:


  • downstream is the Flow.Subscriber that receives the signals.
  • sources is the array of Flow.Publishers that will be consumed one after the other
  • index points at the current Flow.Publisher and gets incremented once one completes.
  • trampoline is the work-in-progress indicator for the drain loop; chosen to avoid clashing with the arbiter's own wip field in this blog for better readability. In practice, since they are in different classes, one can name them both wip.
  • consumed tracks how many items the current source has produced (and has the coordinator consumed). We'll use this to update the arbiter at the completion of the current source instead of doing it for each item received because that saves a lot of overhead and we don't really care about each individual item's reception.


The coordinator's onXXX methods are relatively trivial at this point:


   @Override
    public void onSubscribe(Flow.Subscription s) {
        setSubscription(s);
    }

    @Override
    public void onNext(T item) {
        consumed++;
        downstream.onNext(item);
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        drain();
    }
   


We save the Flow.Subscription into the arbiter, write through the item or throwable and call the drain method upon normal completion.

What's left is the drain() method itself:


    void drain() {
        // (1) -------------------------------------------------------
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                // (2) -----------------------------------------------
                if (isCancelled()) {
                    return;
                }

                // (3) -----------------------------------------------
                if (index == sources.length) {
                    downstream.onComplete();
                    return;
                }

                // (4) -----------------------------------------------
                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                // (5) -----------------------------------------------
                sources[index++].subscribe(this);

            // (6) ---------------------------------------------------
            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }

Again, not really a complicated method, but as usual, the difficulty may come from understanding why such short code is actually providing the required behavior and safeguards:


  1. We know that drain() is only invoked from the subscribe() or onComplete() methods. This standard lock-free trampolining check ensures only one thread is busy setting up the consumption of the next (or the first) source. In addition, since only a guaranteed one-time per source onComplete() can trigger the consumption of the next, we don't have to worry about racing with onNext in this operator. (However, an in-flow concatMap is a different scenario.) This setup also defends against increasing the stack depth due to tail-subscription: a trampoline > 1 indicates we can immediately subscribe to the next source.
  2. In case the downstream cancelled the operator, we simply quit the loop.
  3. In case the index is equal to the number of sources, it means we reached the end of the concatenation and can complete the downstream via onComplete().
  4. Otherwise, we indicate to the arbiter the number of items consumed from the previous source so it can update its outstanding (current) request amount. Note that consumed is not concurrently updated because onNext and onComplete (and thus drain) on the same source can't be executed concurrently.
  5. We then subscribe to the next source, move the index forward by one to point to the next-next source and subscribe with this.
  6. Finally if there was no synchronous or racy onComplete, we quit the loop, otherwise we resume with the subsequent sources.


One can add a few features and safeguards to this coordinator, such as delaying errors till the very end and ensuring the indexth sources entry is not null. These are left as exercise to the reader.


Repeat


How can we turn this into a repeat operator where the source is resubscribed on successful completion? Easy: drop the index and have only a single source Flow.Publisher to be worked on!


public static <T> Flow.Publisher<T> repeat(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RepeatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onXXX methods are the same

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                if (--max < 0L) {
                    downstream.onComplete();
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


Given that repeating indefinitely is usually not desired, we limit the resubscriptions to a number of times specified by the user. Since there is only one source Flow.Publisher, no indexing into an array is needed and we only have to decrement the counter to detect the condition for completing the downstream.


Retry


How about retrying a Flow.Publisher in case it failed with an onError? Easy: have onError call drain() and onComplete call downstream.onComplete() straight:


public static <T> Flow.Publisher<T> retry(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (--max < 0L) {
            downstream.onError(throwable);
        } else {
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


There are two slight changes in retry(). First, in case we run out of the retry count, the latest Flow.Publisher's error is delivered to the downstream from within onError and no further retry can happen. Consequently, the drain logic no longer should call onComplete when the number of allowed retries have been used up.


On error, resume with another Flow.Publisher


Now that we've seen multi-source switchover and single-source continuation, switching to an alternative or "fallback" Flow.Publisher should be straightforward to set up: have a 2 element array with the main and fallback Flow.Publishers, then make sure onError triggers the switch.


public static <T> Flow.Publisher<T> onErrorResumeNext(
        Flow.Publisher<T> source, Flow.Publisher<T> fallback) {
    return new RepeatPublisher<>(source, fallback);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    final Flow.Publisher<T> fallback;

    boolean switched;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (switched) {
            downstream.onError(throwable);
        } else {
            switched = true;
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                if (switched) {
                    fallback.subscribe(this);
                } else {
                    source.subscribe(this);
                }

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}

Here, we have two states, switched == false indicates we are consuming the main source. If that fails, we set switched = true and the drain loop will subscribe to the fallback Flow.Publisher. However, if the fallback fails, the onError also checks for switched == true and instead of draining (and thus retrying) the fallback Flow.Publisher again, it just terminates with the Throwable the fallback emitted.


Conclusion


In this post, the subscription arbitration concept was presented which allows us to switch between non-overlapping Flow.Publisher sources when one terminates (completes or fails with an error) while maintaining the link of cancellation between the individual Flow.Subscriptions as well as making sure backpressure is properly transmitted and preserved between them.

When combining with a trampolining logic, such arbitration allowed us to implement a couple of standard ReactiveX operators such as concat, repeat, retry and onErrorResumeNext while only applying small changes to the methods and algorithms in them.

Note however, that even if the arbiter can be reused for in-flow operators such as concatMap (concatenate Flow.Publishers generated from upstream values), repeatWhen (repeat if a companion Flow.Publisher signals an item) and retryWhen, one can no longer use a single Flow.Subscriber to subscribe to both the main flow and the inner/companion flows at the same time. We will explore these types of operators in a later post.

The arbitration has its own limit: it can't support live switching between sources, i.e., when onNext may be in progress while the switch has to happen. If you are familiar with the switchMap operator, this is what can happen during its execution. We'll look into this type of operator in a subsequent post.

But for now, we'll investigate a much lighter set of operators in the next post: limiting the number of items the downstream can receive and skipping certain number of items from the upstream; both based on counting items and based on a per-item predicate checks, i.e., the take() and skip() operators.

Nincsenek megjegyzések:

Megjegyzés küldése