2017. március 5., vasárnap

Java 9 Flow API: asynchronous integer range source

Introduction


Java 9 is becoming more reactive by introducing the Reactive-Streams interfaces under the parent class java.util.concurrent.Flow, enabling a new standard interoperation between future libraries built on top.

There is almost no documentation beyond a underwhelming Oracle documentation and the SubmissionPublisher class' JavaDoc about how to write Publishers, Subscriptions and Subscribers under the Flow API. Plus the Oracle document practically concludes with see RxJava.

Indeed, replacing the imports of org.reactivestreams.* with java.util.concurrent.Flow.* in RxJava 2's sources get's one a fully fledged reactive library but there seems to be one crucial expectation with components built on the Flow API: they have to be asynchronous at every stage. I could argue that the underlying concepts work totally fine in synchronous mode, but who am I to question the established definitions?

Oh well, if the constraint is to be asynchronous, then let's do it in an asynchronous way.

To see what it takes, we could start with a relatively simple source: an asynchronous integer range.

Since both Java 9 and the IDE support is in non-final state, I recommend IntelliJ 2017.1 EAP for this "exercise".


Asynchronous integer range source


Unfortunately, Java 9 won't introduce any standard fluent API entry point with all the well loved map(), filter(), flatMap() etc. operators but one has to build individual Publishers and compose them stage-by-stage.

This involves creating a parent Publisher class with the following typical pattern to host the input parameters of the flow to be observed:


import java.util.concurrent.*;

public final class FlowRange implements Flow.Publisher<Integer> {
    final int start;
    final int end;
    final Executor executor;

    public FlowRange(int start, int count, Executor executor) {
        this.start = start;
        this.end = start + count;
        this.executor = executor;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        // TODO implement
    }
} 

For brevity, the potential integer overflow of start + count was ignored here. Otherwise, so far nothing special.

Generally in RxJava, if one writes a backpressure-enabled source, one has to implement something on top of the Subscription interface. For intermediate operators (such as map()), one usually has to implement a Subscriber + Subscription wrapper together. Since the integer range is a plain source, we have to take the Flow.Subscription route.

The general pattern with that is to repeat the input parameters along with the actual Flow.Subscriber that will receive the notifications:


    // inside FlowRange

    static final class RangeSubscription
    extends AtomicLong                                          // (1)
    implements Flow.Subscription,
               Runnable                                         // (2)
    {
        final Flow.Subscriber<? super Integer> actual;

        final int end;

        final Executor executor;

        int index;                                              // (3)
        boolean hasSubscribed;                                  // (4)

        volatile boolean cancelled;
        volatile boolean badRequest;                            // (5)

        RangeSubscription(
                Flow.Subscriber<? super Integer> actual, 
                int start, int end,
                Executor executor) {
             this.actual = actual;
             this.index = start;
             this.end = end;
             this.executor = executor;
        }

        @Override
        public void request(long n) {
            // TODO implement
        }

        @Override
        public void cancel() {
            // TODO implement
        }

        @Override
        public void run() {
             // TODO implement
        }
    }


There are a couple of things that need a bit of an explanation:


  1. In order to ensure no more than the requested amount is emitted, we have to track the downstream's request amounts. Generally, you'd want to use a volatile long requested field along with a VarHandle REQUESTED for fast atomics, but our range source has only the requested amount itself needing atomic support, hence extending AtomicLong is a cheap way to get those atomics.
  2. Since we have to be asynchronous when interacting with the actual Subscriber, task(s) have to be submitted to the Executor. We'd like to avoid creating excess amount of Runnables in general and in this particular case, we don't need to since all cross-thread communication is done via thread-safe fields.
  3. Speaking of thread-safety, the index field, that follows how many items have been emitted will be confined to the thread that runs the emission logic in run(). We initialize it to the start value of the range and we'll let it run until it reaches the end value.
  4. One of the implications of going fully async is that the call to onSubscribe() has to happen asynchronously as well, unlike what we can see in RxJava. This is a tradeoff between eager-cancellation and thread-confinement.
  5. This may seem to be an odd field. In the Reactive-Streams specification, calling request() with a non-positive value must be rewarded with an IllegalArgumentException that contains the rule number "3.9" and has to be sent via onError() downstream. Since calling the onXXX methods has to be serialized (no concurrent invocations), we have to communicate the violation in some way to the emitting thread. The easiest way is to use this volatile field.
So far, since we have the skeleton-definition of the integer range source, there is nothing too complicated or convoluted in the code.

However, we now have a few problems to solve when trying to implement the TODO marked methods:

  1. Unlike Scheduler.Worker, the Executor interface gives no guarantees that submitting two Runnables, one after the other from the same thread, will execute in the same order by the underlying thread(pool). Therefore, we need a way to make sure there is no concurrent execution happening when the downstream requests concurrently for example.
  2. The implementation of request() must be thread-safe, reentrant-safe and has to trigger emission of the requested amount of values on the given Executor. Bad requests should be also signalled through the Executor.
  3. Flow.Subscriber.onSubscribe() has to be called before any other signal is emitted on the given Executor as well.

To resolve these problems, maybe surprisingly, the core component we need is the request accounting (AtomicLong) itself by cleverly using its value transitions along with extra fields we see in the skeleton above. In headlights:


  1. This is called trampolining in RxJava's terminology and we'll use the request amount's (atomic) transition from 0 to N (where N > 0L), at which point we will "schedule" the RangeSubscription itself via Executor.execute(). This transition guarantees that when the request amount is 0 there is no concurrent modification and notification happening and is safe to start a new run of emission.
  2. By using the same trampolining and atomics guarantees, calling request() is also thread-safe and reentrant-safe. Since the bad request may come from any thread as well, we have to set the badRequest flag and "imitate" a request(1) situation to get the emission thread going. Of course, the emission thread has to detect that this "1" is not a real downstream request by reading the badRequest flag first and signalling the required exception.
  3. For making sure onSubscribe() is always called first and exactly once, we have to check and store the hasSubscribed flag accordingly. Since this has to happen asynchronously and as of the consequence of subscribing to FlowRange, we will use the same request(1) call trick to avoid reentrancy problem from the real requests as well as jumping to the right thread via the Executor.
Now let's see how these look like in code. The subscribe() is straightforward based on (3)

    // ...

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        RangeSubscription sub = new RangeSubscription(subscriber, start, end, executor);
        sub.request(1);
    }

    // ...



Next, we have to deal with the requests (2):

    // ...

        @Override
        public void request(long n) {
            if (n <= 0L) {
                badRequest = true;
                n = 1;
            }
            for (;;) {
                long r = get();
                long u = r + n;
                if (u < 0L) {
                    u = Long.MAX_VALUE;
                }
                if (compareAndSet(r, u)) {
                    if (r == 0L) {
                        executor.execute(this);
                    }
                    break;
                }
            }
        }

    // ...


First, we check for non-positive request amounts and set the badRequest flag to notify the emitter thread about the problem. Then, we perform the typical, atomic request addition capped to Long.MAX_VALUE and in case the previous request was zero, we start the emission by submitting this to the Executor. If the previous request was non-zero, this atomic change will indicate the emitter loop inside run() to loop a bit more.

The cancellation is trivial, set the cancelled to true since we don't have to execute any cleanup with this type of source. On the emitter thread, the emissions will stop reasonably quickly.

    // ...

        @Override
        public void cancel() {
            this.cancelled = true;
        }

    // ...


Finally, the most complicated part is the run() method responsible for emitting signals on the Executor's thread (1).

    // ...

        @Override
        public void run() {

            Subscriber<? super Integer> a = actual;

            if (!hasSubscribed) {                        // (1)
                hasSubscribed = true;
                a.onSubscribe(this);
                if (decrementAndGet() == 0) {            // (2)
                    return;
                }
            }

            long r = get();                              // (3)
            int idx = index;
            int f = end;
            long e = 0L;
            
            for (;;) {
                while (e != r && idx != f) {             // (4)
                    if (cancelled) {
                        return;
                    }
                    if (badRequest) {                    // (5)
                        cancelled = true;
                        a.onError(new IllegalStateException(
                             "§3.9 violated: non-positive request received"));
                        return;
                    }

                    a.onNext(idx);

                    idx++;                               // (6)
                    e++;
                }

                if (idx == f) {                          // (7)
                    if (!cancelled) {
                        a.onComplete();
                    }
                    return;
                }

                r = get();                               // (8)
                if (e == r) {
                    index = idx;
                    r = addAndGet(-e);
                    if (r == 0L) {
                        break;
                    }
                    e = 0L;
                }
            }
        }

    // ...


Let's see what's happening next to the notable lines:

  1. Once the run() is executing, the very first step is to make sure onSubscribe() is called exactly once.
  2. Decrementing the requested amount has two purposes here: first remove the virtual request(1) that came from the subscribe() method as the first signal to trigger the call to onSubscribe() itself. This decrement has to happen after the call to onSubscribe() because, as second, the downstream may now issue real requests on top. If it does, we need the correct amount later on. If there is no request, we can quit because there is no reason to emit anything at that point.
  3. We read out the current request amount, the index where we have to start or have left off in the previous emission loop and load the end value (exclusive) into a local variable since we are going to access it frequently.
  4. After the typical queue-drain loop pattern is entered, we loop until the emission count e and the initially known request amount r matches or we reach the end of the range.
  5. Since a bad request triggers a virtual request(1) as well, we have to check the badRequest flag and signal the error instead of emitting a value (which was probably not requested by the downstream anyway) and quit the method.
  6. Once the current index value has been emitted, we move the emission count and the index itself forward.
  7. If the loop in (4) was stopped because we reached the end of the range, we emit the onComplete signal (unless cancelled in the mean time) and quit the method.
  8. Since atomic operations are expensive and it is very likely more requests arrive from downstream while the emission loop executes, we can avoid the atomic subtraction by first checking if the request amount has changed since the last time it was read in (3) and if so, just going another round and continue emitting. If it hasn't changed, we atomically subtract the emitted count. At this point, it is still possible a concurrent request() changes the amount and we have to resume the loop again, this time starting the emitted count from zero.
Now that we have the full source ready, let's test it!

Testing


Unfortunately, Java 9 doesn't offer any built-in, reusable consumer we could use to verify the FlowRange source, therefore, we have to manually build one from scratch. Depending on the convenience we'd like to have, the test consumer, let's call it TestFlowSubscriber can be relatively simple:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;

public class TestFlowSubscriber<T>
implements Flow.Subscriber<T> {

    protected final List<T> values;

    protected final List<Throwable> errors;

    protected int completions;

    protected Flow.Subscription subscription;

    protected final CountDownLatch done;

    public TestFlowSubscriber() {
        this.values = new ArrayList<>();
        this.errors = new ArrayList<>();
        this.done = new CountDownLatch(1);
    }

    @Override
    public final void onSubscribe(
            Flow.Subscription subscription) {
        this.subscription = subscription;
        onStart();
    }

    public void onStart() {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T item) {
        values.add(item);
    }

    @Override
    public void onError(Throwable throwable) {
        errors.add(throwable);
        done.countDown();
    }

    @Override
    public void onComplete() {
        completions++;
        done.countDown();
    }

    public final List<T> values() {
        return values;
    }

    public final List<Throwable> errors() {
        return errors;
    }

    public final int completions() {
        return completions;
    }

    public final boolean await(long timeout, TimeUnit unit) 
            throws InterruptedException {
        return done.await(timeout, unit);
    }
}



The TestFlowSubscriber offers these basic features:

  • Override onStart() to issue custom request amount upfront, the rest can be requested via this.subscription.request() from onNext() later on.
  • Override onNext(), onError() or onComplete() to perform custom actions instead/on top of saving the item, error or incrementing the completion counter.
  • Since sources are expected to be asynchronously emitting, the internal CountDownLatch's await() is exposed which waits until the source completes normally or with an error.
Now let's validate the FlowRange source via a JUnit 4 test case:

public class FlowRangeTest {
    @Test
    public void normal() {
        FlowRange source = new FlowRange(1, 5, Runnable::run);

        TestFlowSubscriber<Integer> ts = new TestFlowSubscriber<>();

        source.subscribe(ts);

        assertEquals(Arrays.asList(1, 2, 3, 4, 5), ts.values());
        assertEquals(1, ts.completions());
        assertTrue(ts.errors().isEmpty());
    }
}

Now wait a minute, I said it asynchrony is required yet this test uses Runnable::run as the Executor! This could be surprising to newcommers but it is a pretty standard property of the design employed here (and in RxJava 2): asynchrony is orthogonal to the emission in some sense and due to the trampolining/co-routine structure, it works both in synchronous and asynchronous mode!

Therefore, let's see a real asynchronous test case:


    // ...

    @Test
    public void async() throws InterruptedException {
        FlowRange source = new FlowRange(1, 5, ForkJoinPool.commonPool());
        TestFlowSubscriber<Integer> ts = new TestFlowSubscriber<>();

        source.subscribe(ts);

        assertTrue(ts.await(5, TimeUnit.SECONDS));
        assertEquals(Arrays.asList(1, 2, 3, 4, 5), ts.values());
        assertEquals(1, ts.completions());
        assertTrue(ts.errors().isEmpty());
    }

    // ...


Still works, great!

Conclusion


Java 9 becomes reactive but documentation and guides, at the moment, are scarce and since many developers on the desktop/server JVM are unaware of the state-of-the art of reactive libraries available today, having a new set of guides and posts written specifically from the Java 9 Flow API's perspective and terminology could help extend the JDK's own use of the reactive technology much earlier.

Don't underestimate the difficulty of building reactive components this way, the state/flow management can become quite complicated and it is often difficult to undersand why tricks, such as reusing the requested amount in RangeSubscription works for example. (However, if you saw the typical concurrency-related source code in the JDK, such as SubmissionPublisher, I believe the style in this blog post and in RxJava in general is more comprehensible.)

In the next post of the Java 9 Flow API series, I'm going to show how one can implement asynchronous map() and filter() intermediate operators with it.