2017. március 5., vasárnap

Java 9 Flow API: asynchronous integer range source

Introduction


Java 9 is becoming more reactive by introducing the Reactive-Streams interfaces under the parent class java.util.concurrent.Flow, enabling a new standard interoperation between future libraries built on top.

There is almost no documentation beyond a underwhelming Oracle documentation and the SubmissionPublisher class' JavaDoc about how to write Publishers, Subscriptions and Subscribers under the Flow API. Plus the Oracle document practically concludes with see RxJava.

Indeed, replacing the imports of org.reactivestreams.* with java.util.concurrent.Flow.* in RxJava 2's sources get's one a fully fledged reactive library but there seems to be one crucial expectation with components built on the Flow API: they have to be asynchronous at every stage. I could argue that the underlying concepts work totally fine in synchronous mode, but who am I to question the established definitions?

Oh well, if the constraint is to be asynchronous, then let's do it in an asynchronous way.

To see what it takes, we could start with a relatively simple source: an asynchronous integer range.

Since both Java 9 and the IDE support is in non-final state, I recommend IntelliJ 2017.1 EAP for this "exercise".


Asynchronous integer range source


Unfortunately, Java 9 won't introduce any standard fluent API entry point with all the well loved map(), filter(), flatMap() etc. operators but one has to build individual Publishers and compose them stage-by-stage.

This involves creating a parent Publisher class with the following typical pattern to host the input parameters of the flow to be observed:


import java.util.concurrent.*;

public final class FlowRange implements Flow.Publisher<Integer> {
    final int start;
    final int end;
    final Executor executor;

    public FlowRange(int start, int count, Executor executor) {
        this.start = start;
        this.end = start + count;
        this.executor = executor;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        // TODO implement
    }
} 

For brevity, the potential integer overflow of start + count was ignored here. Otherwise, so far nothing special.

Generally in RxJava, if one writes a backpressure-enabled source, one has to implement something on top of the Subscription interface. For intermediate operators (such as map()), one usually has to implement a Subscriber + Subscription wrapper together. Since the integer range is a plain source, we have to take the Flow.Subscription route.

The general pattern with that is to repeat the input parameters along with the actual Flow.Subscriber that will receive the notifications:


    // inside FlowRange

    static final class RangeSubscription
    extends AtomicLong                                          // (1)
    implements Flow.Subscription,
               Runnable                                         // (2)
    {
        final Flow.Subscriber<? super Integer> actual;

        final int end;

        final Executor executor;

        int index;                                              // (3)
        boolean hasSubscribed;                                  // (4)

        volatile boolean cancelled;
        volatile boolean badRequest;                            // (5)

        RangeSubscription(
                Flow.Subscriber<? super Integer> actual, 
                int start, int end,
                Executor executor) {
             this.actual = actual;
             this.index = start;
             this.end = end;
             this.executor = executor;
        }

        @Override
        public void request(long n) {
            // TODO implement
        }

        @Override
        public void cancel() {
            // TODO implement
        }

        @Override
        public void run() {
             // TODO implement
        }
    }


There are a couple of things that need a bit of an explanation:


  1. In order to ensure no more than the requested amount is emitted, we have to track the downstream's request amounts. Generally, you'd want to use a volatile long requested field along with a VarHandle REQUESTED for fast atomics, but our range source has only the requested amount itself needing atomic support, hence extending AtomicLong is a cheap way to get those atomics.
  2. Since we have to be asynchronous when interacting with the actual Subscriber, task(s) have to be submitted to the Executor. We'd like to avoid creating excess amount of Runnables in general and in this particular case, we don't need to since all cross-thread communication is done via thread-safe fields.
  3. Speaking of thread-safety, the index field, that follows how many items have been emitted will be confined to the thread that runs the emission logic in run(). We initialize it to the start value of the range and we'll let it run until it reaches the end value.
  4. One of the implications of going fully async is that the call to onSubscribe() has to happen asynchronously as well, unlike what we can see in RxJava. This is a tradeoff between eager-cancellation and thread-confinement.
  5. This may seem to be an odd field. In the Reactive-Streams specification, calling request() with a non-positive value must be rewarded with an IllegalArgumentException that contains the rule number "3.9" and has to be sent via onError() downstream. Since calling the onXXX methods has to be serialized (no concurrent invocations), we have to communicate the violation in some way to the emitting thread. The easiest way is to use this volatile field.
So far, since we have the skeleton-definition of the integer range source, there is nothing too complicated or convoluted in the code.

However, we now have a few problems to solve when trying to implement the TODO marked methods:

  1. Unlike Scheduler.Worker, the Executor interface gives no guarantees that submitting two Runnables, one after the other from the same thread, will execute in the same order by the underlying thread(pool). Therefore, we need a way to make sure there is no concurrent execution happening when the downstream requests concurrently for example.
  2. The implementation of request() must be thread-safe, reentrant-safe and has to trigger emission of the requested amount of values on the given Executor. Bad requests should be also signalled through the Executor.
  3. Flow.Subscriber.onSubscribe() has to be called before any other signal is emitted on the given Executor as well.

To resolve these problems, maybe surprisingly, the core component we need is the request accounting (AtomicLong) itself by cleverly using its value transitions along with extra fields we see in the skeleton above. In headlights:


  1. This is called trampolining in RxJava's terminology and we'll use the request amount's (atomic) transition from 0 to N (where N > 0L), at which point we will "schedule" the RangeSubscription itself via Executor.execute(). This transition guarantees that when the request amount is 0 there is no concurrent modification and notification happening and is safe to start a new run of emission.
  2. By using the same trampolining and atomics guarantees, calling request() is also thread-safe and reentrant-safe. Since the bad request may come from any thread as well, we have to set the badRequest flag and "imitate" a request(1) situation to get the emission thread going. Of course, the emission thread has to detect that this "1" is not a real downstream request by reading the badRequest flag first and signalling the required exception.
  3. For making sure onSubscribe() is always called first and exactly once, we have to check and store the hasSubscribed flag accordingly. Since this has to happen asynchronously and as of the consequence of subscribing to FlowRange, we will use the same request(1) call trick to avoid reentrancy problem from the real requests as well as jumping to the right thread via the Executor.
Now let's see how these look like in code. The subscribe() is straightforward based on (3)

    // ...

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        RangeSubscription sub = new RangeSubscription(subscriber, start, end, executor);
        sub.request(1);
    }

    // ...



Next, we have to deal with the requests (2):

    // ...

        @Override
        public void request(long n) {
            if (n <= 0L) {
                badRequest = true;
                n = 1;
            }
            for (;;) {
                long r = get();
                long u = r + n;
                if (u < 0L) {
                    u = Long.MAX_VALUE;
                }
                if (compareAndSet(r, u)) {
                    if (r == 0L) {
                        executor.execute(this);
                    }
                    break;
                }
            }
        }

    // ...


First, we check for non-positive request amounts and set the badRequest flag to notify the emitter thread about the problem. Then, we perform the typical, atomic request addition capped to Long.MAX_VALUE and in case the previous request was zero, we start the emission by submitting this to the Executor. If the previous request was non-zero, this atomic change will indicate the emitter loop inside run() to loop a bit more.

The cancellation is trivial, set the cancelled to true since we don't have to execute any cleanup with this type of source. On the emitter thread, the emissions will stop reasonably quickly.

    // ...

        @Override
        public void cancel() {
            this.cancelled = true;
        }

    // ...


Finally, the most complicated part is the run() method responsible for emitting signals on the Executor's thread (1).

    // ...

        @Override
        public void run() {

            Subscriber<? super Integer> a = actual;

            if (!hasSubscribed) {                        // (1)
                hasSubscribed = true;
                a.onSubscribe(this);
                if (decrementAndGet() == 0) {            // (2)
                    return;
                }
            }

            long r = get();                              // (3)
            int idx = index;
            int f = end;
            long e = 0L;
            
            for (;;) {
                while (e != r && idx != f) {             // (4)
                    if (cancelled) {
                        return;
                    }
                    if (badRequest) {                    // (5)
                        cancelled = true;
                        a.onError(new IllegalStateException(
                             "§3.9 violated: non-positive request received"));
                        return;
                    }

                    a.onNext(idx);

                    idx++;                               // (6)
                    e++;
                }

                if (idx == f) {                          // (7)
                    if (!cancelled) {
                        a.onComplete();
                    }
                    return;
                }

                r = get();                               // (8)
                if (e == r) {
                    index = idx;
                    r = addAndGet(-e);
                    if (r == 0L) {
                        break;
                    }
                    e = 0L;
                }
            }
        }

    // ...


Let's see what's happening next to the notable lines:

  1. Once the run() is executing, the very first step is to make sure onSubscribe() is called exactly once.
  2. Decrementing the requested amount has two purposes here: first remove the virtual request(1) that came from the subscribe() method as the first signal to trigger the call to onSubscribe() itself. This decrement has to happen after the call to onSubscribe() because, as second, the downstream may now issue real requests on top. If it does, we need the correct amount later on. If there is no request, we can quit because there is no reason to emit anything at that point.
  3. We read out the current request amount, the index where we have to start or have left off in the previous emission loop and load the end value (exclusive) into a local variable since we are going to access it frequently.
  4. After the typical queue-drain loop pattern is entered, we loop until the emission count e and the initially known request amount r matches or we reach the end of the range.
  5. Since a bad request triggers a virtual request(1) as well, we have to check the badRequest flag and signal the error instead of emitting a value (which was probably not requested by the downstream anyway) and quit the method.
  6. Once the current index value has been emitted, we move the emission count and the index itself forward.
  7. If the loop in (4) was stopped because we reached the end of the range, we emit the onComplete signal (unless cancelled in the mean time) and quit the method.
  8. Since atomic operations are expensive and it is very likely more requests arrive from downstream while the emission loop executes, we can avoid the atomic subtraction by first checking if the request amount has changed since the last time it was read in (3) and if so, just going another round and continue emitting. If it hasn't changed, we atomically subtract the emitted count. At this point, it is still possible a concurrent request() changes the amount and we have to resume the loop again, this time starting the emitted count from zero.
Now that we have the full source ready, let's test it!

Testing


Unfortunately, Java 9 doesn't offer any built-in, reusable consumer we could use to verify the FlowRange source, therefore, we have to manually build one from scratch. Depending on the convenience we'd like to have, the test consumer, let's call it TestFlowSubscriber can be relatively simple:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;

public class TestFlowSubscriber<T>
implements Flow.Subscriber<T> {

    protected final List<T> values;

    protected final List<Throwable> errors;

    protected int completions;

    protected Flow.Subscription subscription;

    protected final CountDownLatch done;

    public TestFlowSubscriber() {
        this.values = new ArrayList<>();
        this.errors = new ArrayList<>();
        this.done = new CountDownLatch(1);
    }

    @Override
    public final void onSubscribe(
            Flow.Subscription subscription) {
        this.subscription = subscription;
        onStart();
    }

    public void onStart() {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T item) {
        values.add(item);
    }

    @Override
    public void onError(Throwable throwable) {
        errors.add(throwable);
        done.countDown();
    }

    @Override
    public void onComplete() {
        completions++;
        done.countDown();
    }

    public final List<T> values() {
        return values;
    }

    public final List<Throwable> errors() {
        return errors;
    }

    public final int completions() {
        return completions;
    }

    public final boolean await(long timeout, TimeUnit unit) 
            throws InterruptedException {
        return done.await(timeout, unit);
    }
}



The TestFlowSubscriber offers these basic features:

  • Override onStart() to issue custom request amount upfront, the rest can be requested via this.subscription.request() from onNext() later on.
  • Override onNext(), onError() or onComplete() to perform custom actions instead/on top of saving the item, error or incrementing the completion counter.
  • Since sources are expected to be asynchronously emitting, the internal CountDownLatch's await() is exposed which waits until the source completes normally or with an error.
Now let's validate the FlowRange source via a JUnit 4 test case:

public class FlowRangeTest {
    @Test
    public void normal() {
        FlowRange source = new FlowRange(1, 5, Runnable::run);

        TestFlowSubscriber<Integer> ts = new TestFlowSubscriber<>();

        source.subscribe(ts);

        assertEquals(Arrays.asList(1, 2, 3, 4, 5), ts.values());
        assertEquals(1, ts.completions());
        assertTrue(ts.errors().isEmpty());
    }
}

Now wait a minute, I said it asynchrony is required yet this test uses Runnable::run as the Executor! This could be surprising to newcommers but it is a pretty standard property of the design employed here (and in RxJava 2): asynchrony is orthogonal to the emission in some sense and due to the trampolining/co-routine structure, it works both in synchronous and asynchronous mode!

Therefore, let's see a real asynchronous test case:


    // ...

    @Test
    public void async() throws InterruptedException {
        FlowRange source = new FlowRange(1, 5, ForkJoinPool.commonPool());
        TestFlowSubscriber<Integer> ts = new TestFlowSubscriber<>();

        source.subscribe(ts);

        assertTrue(ts.await(5, TimeUnit.SECONDS));
        assertEquals(Arrays.asList(1, 2, 3, 4, 5), ts.values());
        assertEquals(1, ts.completions());
        assertTrue(ts.errors().isEmpty());
    }

    // ...


Still works, great!

Conclusion


Java 9 becomes reactive but documentation and guides, at the moment, are scarce and since many developers on the desktop/server JVM are unaware of the state-of-the art of reactive libraries available today, having a new set of guides and posts written specifically from the Java 9 Flow API's perspective and terminology could help extend the JDK's own use of the reactive technology much earlier.

Don't underestimate the difficulty of building reactive components this way, the state/flow management can become quite complicated and it is often difficult to undersand why tricks, such as reusing the requested amount in RangeSubscription works for example. (However, if you saw the typical concurrency-related source code in the JDK, such as SubmissionPublisher, I believe the style in this blog post and in RxJava in general is more comprehensible.)

In the next post of the Java 9 Flow API series, I'm going to show how one can implement asynchronous map() and filter() intermediate operators with it.

2016. december 27., kedd

The Reactive Scrabble benchmarks

Introduction

In the past year, I've been posting benchmark results under the mysterious Shakespeare Plays (Reactive) Scrabble name. In this blog post, I'll explain what this benchmark is, where does it come from, how it works, what the intent is and how to apply it to your favorite and not-yet-benchmarked library.

History

The benchmark was designed and developed by Jose Paumard and results presented in his 2015 Devoxx talk (a bit long but worth watching). The benchmark measures how fast a certain data-processing library can find the most valuable word from a set of words taken from (one of) Shakespeare's work based on the rules and point schema of Scrabble. RxJava at the time was in its 1.0.x version and to my surprise, it performed poorly compared to Java 8 Streams:

https://youtu.be/fabN6HNZ2qY?t=8369

The benchmark, utilizing JMH, is completely synchronous; no thread hopping happens yet RxJava performs 10x slower, or more likely, it has 10x more overhead in the associated set of operators. In addition, Jose also added a parallel-stream version which runs the main "loop" in parallel before joining for the final result.

More disappointingly, RxJava 2 developer preview the time was terrible as well (relatively, measured on a weak CPU in February).

Therefore, instead of blaming the benchmark or the author, I set out on a quest to understand the benchmark's expectations and improve RxJava 2's performance and if possible, port that back to RxJava 1.

The original Stream-benchmark

Perhaps the most easy way to understand how the computation in the benchmark works, Let's see the original, non-parallel Stream version of it. Since going sequential or parallel requires only a sequential() or parallel() operator on a Stream, they both extend an abstract superclass containing the majority of the code and only get specialized for the operation mode in two additional classes.

ShakespearePlaysScrabbleWithStreamBeta.java

I added postfix "Beta" - meaning alternate version in this context - to distinguish between a version that has a slight difference in one of the computation steps. I'll explain why when I describe the original RxJava-benchmark down below.

The benchmark is built in a somewhat unconventional, perhaps over-functionalized manner and I had a bit of trouble putting the functionality back together in my head. It isn't that complicated though.

The inputs to the benchmark are hidden in a base class' fields shakespeareWords (HashSet<String>), scrabbleWords (HashSet<String>), letterScore (int[]) and scrabbleAvailableLetters (int[]). shakespeareWords contains all the words, lowercased, of Shakespeare's work. scrabbleWords contains the allowed words, lowercased, by Scrabble itself. letterscore contains the scores of the scores of letters through a-z and scrabbleAvailableLetters (seems to me) is there to limit the score if the particular letter appears multiple times in a word.

The benchmark, due to dependencies of one step on the other, is written in "backwards" order, starting with a function that finds the score of a letter. Given an English letter with code 96-121, the function maps it to the 0-25 range and gets the score from the array.

IntUnaryOperator scoreOfALetter = letter -> letterScores[letter - 'a'];

The next function, given a histogram of letters in a word in the form of a Map.Entry (where the key is the letter and the value is the number of occurrence in the word), calculates a bounded score of that letter in the word.

ToIntFunction<Entry<Integer, Long>> letterScore =
    entry ->
        letterScores[entry.getKey() - 'a'] *
        Integer.min(
            entry.getValue().intValue(),
            scrabbleAvailableLetters[entry.getKey() - 'a']
        );

For that, we need the actual histogram of words which is computed by the following function:


Function<String, Map<Integer, Long>> histoOfLetters =
    word -> word.chars()
                .boxed()
                .collect(
                    Collectors.groupingBy(
                        Function.identity(),
                        Collectors.counting()
                    )
                );

This is where a particular dataflow library comes into play. Given a word as Java String, split it into individual characters and count how many of each character is in that word. For example, "jezebel" will count 1-j, 3-e, 1-z, 1-b and 1-l. In the Stream version, the IntStream of characters provided by String itself is converted into a boxed Stream<Integer> and grouped into a Map by a counting standard collector with no key mapping. Note that the return type of the function is Map and not Stream<Map>.

The next function calculates the blank score of a character occurrence:


ToLongFunction<Entry<Integer, Long>> blank =
    entry ->
        Long.max(
            0L,
            entry.getValue() -
                scrabbleAvailableLetters[entry.getKey() - 'a']
        );

Given an entry from the histogram above, it gives bonus points if the particular letter occurs more than its score in the scrabbleAvailableLetters array. For example, if the letter 'd' appears twice, the scrabbleAvailableLetters for it is 1 and this computes to 1. If the letter 'e' appears twice, the array entry for it is 12 and the function computes 0.


The next function combines the histoOfLetters with the blank function to compute the number of blanks in an entire word:

Function<String, Long> nBlanks =
    word -> histoOfLetters.apply(word)
                          .entrySet().stream()
                          .mapToLong(blank)
                          .sum();


Here the histogram of the letters in the given word is computed and returned in a Map, then each entry of this Map is streamed, mapped into the blank letter value and then summed up into a final value. (Honestly, I'm not familiar with the rules of Scrabble and this last two functions seem to be extra convolution to have the computation work harder.)
The follow-up function takes the result of nBlanks and checks if a word can be written with 2 or less blanks:


Predicate<String> checkBlanks = word -> nBlanks.apply(word) <= 2;


The next 2 functions pick the first 3 and last 3 letters of a word:

Function<String, IntStream> first3 = word -> word.chars().limit(3);

Function<String, IntStream> last3 = 
    word -> word.chars().skip(Integer.max(0, word.length() - 4));


These won't stay separated and are immediately combined back together:

Function<String, IntStream> toBeMaxed =
    word -> Stream.of(first3.apply(word), last3.apply(word))
                  .flatMapToInt(Function.identity());

Practically, the first 3 and last 3 letters (with possibly overlap for shorter words) are concatenated back into a single IntStream via flatMapToInt, i.e., "jezebel" will stream letter-by-letter as "jezbel".

Given the merged character stream, we compute the maximum score of the letters:

ToIntFunction<String> bonusForDoubleLetter =
     word -> toBeMaxed.apply(word)
                      .map(scoreOfALetter)
                      .max()
                      .orElse(0);


Note that IntStream.max() returns Optional.

We then calculate the final score of a word:

Function<String, Integer> score3 =
    word ->
        2 * (score2.apply(word) + bonusForDoubleLetter.applyAsInt(word))
        + (word.length() == 7 ? 50 : 0);


This involves a bonus 50 points for words with length 7 and twice the score of the base word and the double letter bonus. Note that both score2 and bonusForDoubleLetter are evaluated once and multiplied by the literal two.

Now we reach the actual "loop" for calculating the scores of each word in the Shakespeare word set:

Function<Function<String, Integer>, Map<Integer, List<String>>> buildHistoOnScore =
    score -> shakespeareWords.stream()
             .filter(scrabbleWords::contains)
             .filter(checkBlanks)
             .collect(
                 Collectors.groupingBy(
                     score,
                     () -> new TreeMap<Integer, List<String>>(Comparator.reverseOrder()),
                     Collectors.toList()
                 )
             );


This is an odd function because it takes another function, the score function as input, returns a Map keyed by a score and a list of words that have that score. The body takes the set of shakespeareWords, streams it (the parallel version has parallelStream()) here, filters out those that are in the allowed Scrabble words set, filters out those that have less than two blank score, then groups the "remaining" words based on their computed score into a reverse-ordered TreeMap with Integer key and List elements - all with the help of standard Stream Collectors.

Finally, we are not interested in all words but only the top 3 scoring set of words:

List<Entry<Integer, List<String>>> finalList =
                buildHistoOnScore.apply(score3)
                    .entrySet()
                    .stream()
                    .limit(3)
                    .collect(Collectors.toList()) ;


We apply the score3 function to the "loop" and put the top 3 entries into the final list. If all went well, we should get the following entries:

120 = jezebel, quickly
118 = zephyrs
116 = equinox


The original RxJava benchmark


Given the fact that RxJava and Java Streams have quite similar APIs and equivalent operators, the original RxJava benchmark was written with an odd set of helper components and changes to the pattern of the functions above.

The first oddity is the introduction of a functional style of counting: LongWrapper

interface LongWrapper {
    long get();

    default LongWrapper incAndSet() {
        return () -> get() + 1;
    }
}

LongWrapper zero = () -> 0;
LongWrapper one = zero.incAndSet();


(This over-functionalization is a recurring theme with Jose, see this year's JavaOne video for example.)

The second oddity that many return types that were scalar in the Stream version above were turned into Observables - which adds unnecessary overhead and no operational benefit. So let's see how the original benchmark looks like with RxJava 1:

First, the letterScore now returns a single element Observable with the score value:
 
Func1<Integer, Observable<Integer>> scoreOfALetter = 
    letter -> Observable.just(letterScores[letter - 'a']) ;

Func1<Entry<Integer, LongWrapper>, Observable<Integer>> letterScore =
    entry ->
        Observable.just(
            letterScores[entry.getKey() - 'a'] *
            Integer.min(
                (int)entry.getValue().get(),
                scrabbleAvailableLetters[entry.getKey() - 'a']
            )
        ) ;


This has cascading effects as depending functions now have to deal with an Observable. RxJava doesn't have the direct means to stream the characters of a String so a helper indirection was introduced by reusing Stream's tools and turning that into an Iterable RxJava can understand: 

Func1<String, Observable<Integer>> toIntegerObservable =
    string -> Observable.from(
        IterableSpliterator.of(string.chars().boxed().spliterator())) ;

Building the histogram now uses the LongWrapper and RxJava's collect() operator to build the Map with it:

Func1<String, Observable<HashMap<Integer, LongWrapper>>> histoOfLetters =
    word -> toIntegerObservable.call(word)
            .collect(
                () -> new HashMap<>(),
                (HashMap<Integer, LongWrapper> map, Integer value) -> {
                    LongWrapper newValue = map.get(value) ;
                    if (newValue == null) {
                        newValue = () -> 0L ;
                    }
                    map.put(value, newValue.incAndSet()) ;
                }
             ) ;


Calculating blanks also return Observable instead of a scalar value:

Func1<Entry<Integer, LongWrapper>, Observable> blank =
    entry ->
        Observable.just(
            Long.max(
                0L,
                entry.getValue().get() -
                    scrabbleAvailableLetters[entry.getKey() - 'a']
            )
        ) ;

Func1<String, Observable<Long>> nBlanks =
    word -> histoOfLetters.call(word)
            .flatMap(map -> Observable.from(() -> map.entrySet().iterator()))
            .flatMap(blank)
            .reduce(Long::sum) ;

Func1<String, Observable<Boolean>> checkBlanks =
     word -> nBlanks.call(word)
                    .flatMap(l -> Observable.just(l <= 2L)) ;


Now calculating the scores:

Func1<String, Observable<Integer>> score2 =
     word -> histoOfLetters.call(word)
             .flatMap(map -> Observable.from(() -> map.entrySet().iterator()))
             .flatMap(letterScore)
             .reduce(Integer::sum) ;

Func1<String, Observable<Integer>> first3 =
     word -> Observable.from(
              IterableSpliterator.of(word.chars().boxed().limit(3).spliterator())) ;


Func1<String, Observable<Integer>> last3 =
    word -> Observable.from(
                IterableSpliterator.of(word.chars().boxed().skip(3).spliterator())) ;


Func1<String, Observable<Integer>> toBeMaxed =
    word -> Observable.just(first3.call(word), last3.call(word))
                .flatMap(observable -> observable) ;

Func1<String, Observable<Integer>> bonusForDoubleLetter =
    word -> toBeMaxed.call(word)
            .flatMap(scoreOfALetter)
            .reduce(Integer::max) ;

(Note that last3 returns the letters 4..n instead of the last 3 letters, not sure if this was intentional or not. Changing it to really return the last 3 letters has no measurable performance difference.)

Then we compute the final score per word:

Func1<String, Observable<Integer>> score3 =
    word ->
        Observable.just(
            score2.call(word),
            score2.call(word),
            bonusForDoubleLetter.call(word),
            bonusForDoubleLetter.call(word),
            Observable.just(word.length() == 7 ? 50 : 0)
        )
        .flatMap(observable -> observable)
        .reduce(Integer::sum) ;

Remember the "times 2" from the Stream benchmark, here both scores are streamed again instead of multiplying their result by 2 (via map). This inconsistency with the original Stream alone is responsible of ~30% overhead with the original RxJava 1 benchmark. For comparison, when the same double-streaming is applied to the original Stream benchmark, its measured sample time goes from 27 ms/op up to 39 ms/op.

Lastly, the processing of the entire Shakespeare word set and picking the top 3:

Func1<Func1<String, Observable<Integer>>, Observable<TreeMap<Integer, List<String>>>>
buildHistoOnScore =
     score -> Observable.from(() -> shakespeareWords.iterator())
              .filter(scrabbleWords::contains)
              .filter(word -> checkBlanks.call(word).toBlocking().first())
              .collect(
                  () -> new TreeMap<Integer, List<String>>(Comparator.reverseOrder()),
                  (TreeMap<Integer, List<String>> map, String word) -> {
                      Integer key = score.call(word).toBlocking().first() ;
                      List<String> list = map.get(key) ;
                      if (list == null) {
                          list = new ArrayList<>() ;
                          map.put(key, list) ;
                      }
                      list.add(word) ;
                  }
               ) ;

List<Entry<Integer, List<String>>> finalList2 =
    buildHistoOnScore.call(score3)
    .flatMap(map -> Observable.from(() -> map.entrySet().iterator()))
    .take(3)
    .collect(
        () -> new ArrayList<Entry<Integer, List<String>>>(),
        (list, entry) -> {
            list.add(entry) ;
        }
    )
    .toBlocking()
    .first() ;


Here, we need to go blocking to get the first (and) only value for the checkBlanks as well as getting the only List of results of the final list of top 3 scores and words. The collector function taking the TreeMap and the current entry had to be explicitly typed because for some reason Eclipse can't properly infer the types in that expression.

The optimized version

The mistakes and drawbacks of the original RxJava version has been identified over a long period of time and the optimized benchmark is still "under optimization". Using the right operator for the right job is essential in synchronous processing and some are better suited for this type of work and have less overhead due to their need to support an asynchronous operation mode. The other important thing is to know when to use a reactive type and when to stick to a scalar value.

As mentioned above, the original RxJava benchmark had a bunch of the functions return Observable with a scalar value for no apparent benefit. Changing these back to scalar functions - just like the Stream version helps avoid unnecessary indirection and allocation:


Func1<Integer, Integer> scoreOfALetter = letter -> letterScores[letter - 'a'];


Streaming the characters of a word is the hottest operation and is executed several tens of thousands of time. Instead of the Stream-Spliterator indirection, one can simply index-map a string into its characters:

word -> Observable.range(0, word.length()).map(i -> (int)word.charAt(i));


Instead of the convoluted LongWrapper and its lambda-capture overhead, we can define a simple mutable container for the histogram:

public final class MutableLong {

    public long value;

    public void incAndSet() {
        value++;
    }
}

Func1<String, Observable<HashMap<Integer, MutableLong>>> histoOfLetters =
     word -> toIntegerObservable.call(word)
             .collect(
                 () -> new HashMap<>(),
                 (HashMap<Integer, MutableLong> map, Integer value) -> {
                     MutableLong newValue = map.get(value) ;
                     if (newValue == null) {
                         newValue = new MutableLong();
                         map.put(value, newValue);
                     }
                     newValue.incAndSet();
                 }

              ) ;


The next optimization is the use of flatMapIterable and there is no need to get the iterator of an entySet() but just iterate it since it already implements Iterable:

Func1<String, Observable<Long>> nBlanks =
    word -> MathObservable.sumLong(
                histoOfLetters.call(word)
                .flatMapIterable(map -> map.entrySet())
                .map(blank)
            ) ;


In addition, reduce() has some overhead because of constant boxing and unboxing of a sum or max value of the stream and can be replaced by a dedicated operator from the RxJavaMath library: MathObservable.sumLong().

In synchronous scenarios, concat works better than merge/flatMap most of the time:

Func1<String, Observable<Integer>> toBeMaxed =
    word -> Observable.concat(first3.call(word), last3.call(word));

Func1<String, Observable<Integer>> score3 =
    word ->
        MathObservable.sumInteger(
            Observable.concat(
                score2.call(word).map(v -> v * 2),
                bonusForDoubleLetter.call(word).map(v -> v * 2),
                Observable.just(word.length() == 7 ? 50 : 0)
            )
        );


Note the use of map(v -> v * 2) to multiply the two score components instead of streaming them again.

In addition, there were several internal optimizations to RxJava to improve performance with this type of usage: concat(o1, o2, ...) received a dedicated operator instead of delegating to the Observable of Observables overload. The toBlocking().first() overhead has been improved as well. Currently, the optimized benchmark with RxJava 1.2.4 runs under 67 ms/op, the "Beta" benchmark runs under 100 ms/op and the original benchmark runs under 170 ms/op.

Benchmarking other libraries


Following similar patterns, other streaming libraries (synchronous and asynchronous) were benchmarked over the year. The following subsections summarize what it takes to have them do the Scrabble computation with the functional structures above, how they perform and why are they at the speed they are.

https://twitter.com/akarnokd/status/808995627237601280


Kotlin


Kotlin has its own, rich synchronous streaming standard library and performs quite well in the optimized benchmark: 20 ms/op. It requires a separate project due to a complete separate JVM language which works best under IntelliJ. I'm not deeply familiar with Kotlin thus I'm not sure what it makes that much faster than Stream (or IxJava).

The streaming part of the language is certainly well optimized but it is also possible using HashMap with primitive types gets custom implementation. The streaming standard library is very rich and the whole Scrabble logic could be expressed without building new operators or invoking external libraries.

IxJava

IxJava, short for Iterable eXtensions for Java started out as a companion library to Reactive4Java, the first black-box re-implementation of the Rx.NET library on the JVM (2011). Since then, it has been rewritten from scratch based on the advanced ideas of RxJava 2. The optimized benchmark runs around 23 ms/op, 3 ms faster than the Stream version. Currently, this is the fastest Java library to do the Scrabble benchmark and has all the operators built in for the task. It features less allocation and less indirection, plus there are optimizations for certain shapes of inputs (constant-scalar, single-element sources).

RxJava 2

RxJava is the de-facto standard reactive library for Java 6+ and version 2 supports the Reactive-Streams initiative with its Flowable type. Version 2 was rewritten from scratch in late 2015 and then has been drastically re-architected in mid 2016. The late 2015 version performed poorly with the scrabble benchmark but still 2 times faster than RxJava 1 at the time. Since the dataflow types in RxJava have to anticipate asynchronous and/or backpressured usage with largely the same code path, they have a noticeable overhead when using them in a pure synchronous manner.

Therefore, the Scrabble benchmark is implemented for the backpressure-enabled Flowable and the backpressure-lacking (but otherwise similarly advanced) Observable types. They perform 27.75 ms/op and 26.83 ms/op respectively. Unfortunately, the main RxJava library lacks dedicated operators such as streaming the characters of a String and summing up a stream of numbers and these were implemented in the RxJava 2 Extensions companion library. The additional performance improvement over RxJava 1.x come from the much leaner architecture with fewer indirections, fewer allocations, dedicated concat(source1, source2, ...) operator, a very low overhead blockingFirst() and generally the operator fusion many operators participate in. In the late release-candidate phase, it was decided certain operators return Single, Completable or Maybe instead of its own type. The change did not affect the benchmark result in any measurable way (but the code had to change to work with the new types of course).

In addition, the extension library features a ParallelFlowable type that allows parallel computations over a regular Flowable sequence, somewhat similar to parallel Streams. The parallelization happens for the set of Shakespeare words and requires a manual reduction back to sequential reactive type:


Function<Function<String, Flowable<Integer>>, Flowable<TreeMap<Integer, List<String>>>>
buildHistoOnScore =
    score ->
        ParallelFlowable.from(Flowable.fromIterable(shakespeareWords))
        .runOn(scheduler)
        .filter(scrabbleWords::contains)
        .filter(word -> checkBlanks.apply(word).blockingFirst())
        .collect(
            () -> new TreeMap<Integer, List<String>>(Comparator.reverseOrder()),
            (TreeMap<Integer, List<String>> map, String word) -> {
                Integer key = score.apply(word).blockingFirst();
                List<String> list = map.get(key) ;
                if (list == null) {
                    list = new ArrayList<>() ;
                    map.put(key, list) ;
                }
                list.add(word) ;
            }
        )
        .reduce((m1, m2) -> {
            for (Map.Entry<Integer, List<String>> e : m2.entrySet()) {
                 List<String> list = m1.get(e.getKey());
                 if (list == null) {
                     m1.put(e.getKey(), e.getValue());
                 } else {
                     list.addAll(e.getValue());
                 }
            }
            return m1;
        });

The parallel version measures 7.23 ms/op compared to the Java parallel Streams version with 6.71 ms/op.

Reactor 3

Pivotal's Reactor-Core library is practically RxJava 2 under a different company banner and implementation differences due to being Java 8+, originally contributed by me and as of today, the relevant components of Reactor 3 required by the Scrabble benchmark still uses my algorithms. The few implementation differences come from the use of atomic field updaters instead of atomic classes (such as AtomicInteger) which reduces the allocation amount even further. Unfortunately, even though the same field updaters are available for Java 6 and Android, certain devices don't play nicely with the underlying reflection mechanics. The optimized benchmark code uses custom implementation for streaming the characters of a word and finding the sum/max of a sequence.

Given this difference, Reactor measures 27.39 ms/op, putting it between RxJava 2's Observable and Flowable, somewhat expectedly.

Reactor 3 has direct support for converting to its parallel type, ParallelFlux, which is also practically the same as RxJava 2 Extensions' ParallelFlowable. The ParallelFlux' benchmark clocks in at 8,53 ms/op, however, that 1 ms difference to RxJava 2 is certainly odd and unclear why.

Guava

Google Guava is library with lots of features, among other things, offering sub-library with a fluent-API support with FluentIterable. It has a limited set of streaming operators and the implementation has some unccessary overhead in it. The design reminds me of RxJava 1's Observable where there is a mandatory indirection to an inner type.

Given the limited API, the optimized benchmark code uses custom operators such as streaming the characters of a word, custom sum/max and custom collect() operators, all written with the vocabulary of FluentIterable by me. Therefore, the measured 35.98 ms/op is not entirely the achievement of the library authors.

Interestingly, the backpressure-enabled, async capable Flowable/Flux outperforms the sync-only and thus theoretically lower overhead FluentIterable.


Ix.NET


When the Rx.NET was developed several years ago, they implemented its dual, the Interactive eXtensions building a rich API over their standard, synchronous streaming IEnumerable interface.

Ix.NET is well optimized, most likely due to the nice language features (yield return) and great compiler (state machine building around yield return). Even though .NET supports "primitive specialization", their JIT compiler is not a runtime optimizing compiler and this is likely why the ported Scrabble benchmark measures only 45.4 ms/op.

Unfortunately, there were some missing operators from Ix.NET I had to write manually, such as the now-typically needed streaming of characters and the reduce() operator to support sum/max. (There is no need for custom sum because of the primitive specialization of reduce() provided automatically.)


Reactor.NET

About a year ago, there was a non-zero chance I had to learn and include C# development in my professional line of work. Unfortunately, Rx.NET was and still is an old library with a significant performance overhead due to its synchronous ties, namely returning an IDisposable from Subscribe() instead of injecting it via an OnSubscribe() like all the other 3rd generation (inspired) libraries do. When 3.x didn't change the architecture, I decided instead of battling them over advancing, I could just roll my own library. Since in early 2016 I was involved with Pivotal's Reactor and its third-the-size API surface, I started working on Reactor-Core.NET with all the 4th generation goodies RxJava 2 and Reactor now feature. Unfortunately, the risk of me doing C# faded and I took over leading RxJava, sending this project into sleep.

Regardless, enough operators were implemented already so the Scrabble benchmark for it is available and measures 80.51 ms/op. It may be party due to the .NET platform and also due to a less-than-optimal implementation for streaming characters.


JOOLambda

Back to the Java land, this library is part of the JOOx family of extension libraries, supporting JVM operations such as JDBC-based database interactions to extending the standard Java Stream with features. This, unfortunately, means wrapping a Stream or their Seq type and thus adding a level of indirection. This wouldn't be much of a problem but the API lacks operators that stay in the Seq type for tasks such as collect or sum/max. Therefore, these operators had to be emulated with other operators. A second unfortunate property of JOOLambda is the difficulty of extending it (even non-fluently). I could't find any way of implementing my own operator directly (as with the Rx style and Ix-style APIs) and the closest thing wanted me to implement 70+ standard Stream operators again.

I believe it is still interesting to show how a convenient collect() operator can be implemented if there is no reduce() or even scan() to help us:


Function<String, Seq<HashMap<Integer, MutableLong>>> histoOfLetters =
    word -> {
        HashMap<Integer, MutableLong> map = new HashMap<>();
        return charSeq.apply(word)
               .map(value -> {
                    MutableLong newValue = map.get(value) ;
                    if (newValue == null) {
                        newValue = new MutableLong();
                        map.put(value, newValue);
                    }
                    newValue.incAndSet();
                    return map;
               })
               .skip(Long.MAX_VALUE)
               .append(map);
        };

First, the resulting HashMap is instantiated, knowing that this function will be invoked sequentially, non-recursively thus there won't be any clash between computations of different words. Second, we stream the characters of the word, map each character into the histogram inside the map. We need only a single element of the HashMap but there is no takeLast() operator to ignore all but the very last time the map is forwarded. Instead, we skip all elements and concatenate the single HashMap again to the now empty Seq.

Summing up values is none the less convoluted with JOOL:


Function<String, Seq<Integer>> score2 =
    word -> {
        int[] sum = { 0 };
        return histoOfLetters.apply(word)
               .flatMap(map -> Seq.seq(map.entrySet()))
               .map(letterScore)
               .map(v -> sum[0] += v)
               .skip(Long.MAX_VALUE)
               .append(0)
               .map(v -> sum[0]);
    };

We setup a single element array to be the accumulator for the summing, stream the histogram and sum up the letter scores into this array. We then skip all of it and concatenate 0 followed by mapping (this zero) to the contents of the sum array. Note that append(sum[0]) is evaluated at assembly time (before the sum actually happens) yielding the initial zero every time.

The code measures 86-92 ms/op, however, this might not be that bad because when I'm writing this post, I've noticed a missing optimization that adds unnecessary burden to a core computation - my bad. No worries, I'll remeasure everything again next year since some libraries have since updated their code.


Cyclops-React

This is an odd library, developed mainly by one person. Looking at the Github site I'm sure it used to say Reactive-Streams in the title. I've come across this library a month or so back when the author posted an extensive post about the benefits of it by extending Java Stream with missing features and reactive concepts. When I see "library" and "Reactive-Streams" I jump - writing a reactive library is a very difficult task. It turns out, the library's call in of "Reactive-Streams" was a bit misleading. It is no more reactive than IxJava, which is a completely synchronous streaming API, with the exception that there is a wrapper/converter to a Reactive-Streams Publisher. IxJava has that one but only in various other reactive libraries: Flux.fromIterable() and Flowable.fromIterable().

That aside, it is still a kind of dataflow library and as such can be benchmarked with Scrabble. Cyclops-React builds on top of JOOLambda and my first naive implementation performed similarly to JOOLambda (to be precise, I measured Cyclops-React first, then JOOLambda to see where the poor performance might come from).

Cyclops-React at the time didn't have any collect()/reduce() operators but it has scan (called scanLeft) and takeLast (called takeRight), allowing me to build the necessary computation steps:


Function<String, ReactiveSeq<HashMap<Integer, MutableLong>>> histoOfLetters =
    word ->  toIntegerIx.apply(word)
             .scanLeft(new HashMap<Integer, MutableLong>(), (map, value) -> {
                 MutableLong newValue = map.get(value) ;
                 if (newValue == null) {
                     newValue = new MutableLong();
                     map.put(value, newValue);
                 }
                 newValue.incAndSet();
                 return map;
             })
             .takeRight(1);


From allocation perspective, this is very similar to JOOLambda's workaround since the HashMap is instantiated when the outer function is called and not for the consumer of the aggregation like with RxJava's collect() operator. One convenience though is the takeRight(1) that picks the very last value of the map (as scan emits it every time a new source comes up).

The first benchmarks with version 1.0.3 yielded 108 ms/op. The diagram at the beginning of this section lists it twice. The author of Cyclops-React and I tried to work out a better optimization, but due to the different understanding what the Scrabble benchmark represents, we didn't come to an agreement on the proper optimization (he practically wanted to remove ReactiveSeq, the base type of the library, and basically benchmark Java Stream again; I want to measure the overhead of ReactiveSeq itself).

Since then, version 1.0.5 has been released with library optimizations and my code runs under 54 ms/op while having the same structure as before. The author has also run a few Scrabble benchmarks of his own that show lower overhead, comparable to Stream now. If he achieved it by honoring the structure, that's fantastic. If he practically skipped his own type as the workhorse, that's bad.


Rx.NET

The first, modern reactive library was designed and developed more than 8 years ago at Microsoft. Since then, Rx.NET has become open source, had 3 major releases, and helps (drives?) famous technologies such as Cortana.

It's a bit sad it couldn't evolve beyond its 1st generation reactive architecture. First, it has heavily invested developers who are quite comfortable with how it is implemented, second, the .NET platform has absorbed its base interface types, IObservable and IObserver, that have the unfortunate design of requiring a synchronous IDisposable to be returned. Luckily, the 4th generation architecture works on the .NET platform and the community driven Reactive-Streams.NET initiative may give some hope there as well.

This unfortunate design remnant is visible in the Scrabble benchmark: 413 ms/op. The main overhead comes from the trampolining the range() and enumerable-to-Observable conversion have. This trampolining is necessary to solve the synchronous cancellation problem RxJava solved by having a stateful consumer with a flag and callback mechanism indicating cancellation (which lead to the Subscription injection method in Reactive-Streams).

Interestingly, I've implemented a minimalist, non-backpressured type Ox, similar to RxJava 2's Observable type and it measures 45 ms/op, practically in par with the Ix.NET benchmark.


Swave

Perhaps this library is the youngest of the "reactive" libraries. It's implementation resembles of Akka-Stream with the graph-like internal workings, but it is not a native Reactive-Streams library. It has conversion from and to Publisher but steps themselves aren't Publishers. This adds interoperation overhead. In addition, the library is part of the Yoda-family of reactive libraries; there is no retry. (Maybe because for retry to work, one needs to hold onto the chain that establishes the flow and allow resubscribing without the need for manual reassembing the entire flow.) The library is written in Scala entirely and I gave up on trying to call it from a Java project, hence a separate project for it.

The library itself appears to be single developer only and the documentation is lacking a bit at the moment - not that I can't find operators on my own but a few times it was unclear I'm fighting with the Scala compiler (through IntelliJ) or with this library (you know, when IntelliJ says all is okay but then the build fails with a compilation error due to implicits). The library, version 0.5 at least, didn't have collect, reduce, sum, max but it does have takeLast and the emulations mentioned before work.

None the less, I managed to port the benchmark to Scala and run it, getting a surprising 781 ms/op. Since I can't read Scala code, I can only speculate this comes from the graph-architecture overhead and/or some mandatory asynchronous-ness implicitly present.


Akka-Stream

I've read so much goodness about Akka-Stream, about the technologies and frameworks it supports, its advanced and high performance optimizations over the flow-graph, the vibrant community and developer base around it, the spearheading of the Reactive-Streams initiative itself yet it constantly fails to deliver for me. In addition I've recently found out Akka-Stream is just inspired by Reactive-Streams and the reason they provide converter/wrapper to a Publisher instead of implementing it at every step is because working Reactive-Streams' deferred nature is too hard. Also I couldn't find any means for retrying an Akka-Stream Source so it could be yet another Yoda-library (so how does it support resilience then?).

At least Akka-Stream has a Java DSL so I could implement the Scrabble benchmark within the familiar Java context. The DSL doesn't have collect but supports reduce (thus sum and max requires minimal work). Therefore, the collect operations were implemented with the same map+drop(Long.MAX_VALUE)+concat(map).

The benchmark results are "mind-blasting": 5563 ms/op, that is, it takes about 5.5 seconds to compute the Scrabble answer once. Since Akka-Stream is originally written in Scala, I don't know for sure the source of this overhead but I have a few ideas: the graph-overhead, the mandatory asynchronous nature and perhaps the "fusion optimization" they employ that wastes time trying to optimize a graph that can't be further optimized.

This problem seem to hit any use case that has flatMap in it - one of the most common operator involved in Microservices composition. Of course, one can blame the synchronous nature Scrabble use case which is not the target for Akka-Stream, however, its interoperation capabilities through Reactive-Streams Publisher shows some serious trouble (ops/s, larger is better):


Here, the task is to deliver 1M elements (part of an Integer array) from one thread to another where the work is divided between Akka-Stream and RxJava 2: one delivers count number of 1M/count items, and the other flattens the latter sub-section back to a single stream at the other side. Surprisingly, using Rx as the driver or middle worker improves throughput significantly (but not always). This benchmark stresses mostly the optimizer of Akka-Stream. Do people flatMap with Akka-Stream at all and nobody noticed this?

Conclusion

Writing a reactive library is hard, writing a benchmark to measure those libraries is at best non-trivial. Figuring out why some of them is extremely fast while others are extremely slow requires mastery in both synchronous and asynchronous design and development.

Instead of getting mad at the Scrabble benchmark a year ago, I invested time and effort into improving and optimizing libraries that I could effect and thanks to it, those libraries are now considerably better at this benchmark and in general use due to the deep architectural and conceptional improvements.

I must warn the reader about interpreting the results of the Scrabble benchmarks as the ultimate ranking of the libraries. The fact that libraries perform as they do in this particular benchmark doesn't mean they perform the same in any other situations with other type of tasks. The computation and/or IO overhead may hide the subtle differences in those cases, evening the field between them at the end.

2016. május 2., hétfő

Async Iterable/Enumerable vs. Reactive-Streams

Introduction


Backpressure is essential if one wants to avoid buffer bloat and excessive memory usage if two stages in a reactive pipeline consume events with different speed. RxJava and Reactive-Streams developed a non-blocking, request-coordinating protocol to solve this problem, but you may have heard there are alternatives to it. One alternative that comes up from time to time is Async Iterables (Java terminology) or Async Enumerables (C# terminology).

In fact Rx.NET has an Ix.NET (stands for Interactive Extensions) sub-project in which there is the Async Enumerables library. It solves this backpressure problem by having a Task (~ CompletableFuture, ~ Promise) returned from its MoveNext() (~ hasNext()) method and when that Task fires, you can consume the Current property (~ next() method). The backpressure behavior comes from the fact that you'd call MoveNext() again only after you processed the the current element.

Unfortunately, I haven't found a Java implementation for the IAsyncEnumerable (haven't really looked beyond a few Google searches), so I decided I'll implement it on my own in Java 8, see what it takes to get data across with it and how performant is it compared to my current cutting-edge understanding of reactive-flows: the Reactive-Streams-Commons library.


Base API


Since Async Enumerables are designed in deferred execution in mind, the base API consists of two interfaces:

interface IAsyncEnumerable<T> {
    IAsyncEnumerator<T> enumerator();
}

interface IAsyncEnumerator<T> {

    CompletionStage<Boolean> moveNext(CompositeSubscription cancel);

    T current();
}

The IAsyncEnumerable is the equivalent of Iterable and it hands out IAsyncEnumerators. IAsyncEnumerator has a moveNext method which returns a CompletionStage indicating if there is value available via current() (signals true) or the sequence ended (signals false). C# CancellationToken looks like our CompositeSubscription so I'm reusing it as the way for cancellation.

(Sidenote: I'm not sure how cancellation composes yet, the original Ix.NET IAsyncEnumerator is an IDisposable plus their Task can also be disposed, unlike CompletionStage. Luckily, I don't need this feature too extensively in this post.)


Consuming an IAsyncEnumerable


Consuming such IAsyncEnumerator is straightforward, although not as easy without C# async/await. If we are only interested in exactly one value, we can write:


IAsyncEnumerable<T> source = ...

IAsyncEnumerator<T> enumerator = source.enumerator();

enumerator.moveNext(new CompositeSubscription())
.whenComplete((b, e) -> {
    if (e != null) {
        e.printStackTrace();
    } else
    if (b) {
        System.out.println(enumerator.current());
    } else {
        System.out.println("Empty!");
    }
});

Of course, given the CompletionStage API, you are free to process the single result as you see fit.

Consuming more than one value from an IAsyncEnumerator is more involved. You have to recursively call moveNext until it errors or completes:


public void consumeAll(IAsyncEnumerator<T> enumerator, CompositeSubscription csub) {
    if (csub == null) {
        csub = new CompositeSubscription();
    }

    CompositeSubscription fcsub = csub;

    enumerator.moveNext(new CompositeSubscription())
    .whenComplete((b, e) -> {
        if (e != null) {
            e.printStackTrace();
        } else
        if (b) {
            System.out.println(enumerator.current());

            // go recursive            
            consumeAll(enumerator, fcsub);

        } else {
            System.out.println("Empty!");
        }
    });    
}

Unfortunately, there is a slight problem: if the CompletionStage is a synchronous stage, you may end up with StackOverflowError because of the recursive call to consumeAll. Therefore, to be sure, we have to trampoline the call to consumeAll to ensure the stack dept doesn't grow too large:


public final class AsyncConsumer<T> implements Subscription {

    final Consumer<? super T> onNext;

    final Consumer<Throwable> onError;

    final Runnable onComplete;

    final IAsyncEnumerator<T> enumerator;

    final AtomicInteger wip;

    final Queue<CompletionStage<Boolean>> queue;

    final CompositeSubscription csub;

    final CountDownLatch cdl;

    public AsyncConsumer(
         IAsyncEnumerator<T> enumerator,
         Consumer<? super T> onNext,
         Consumer<Throwable> onError,
         Runnable onComplete
    ) {
         this.enumerator = enumerator;
         this.onNext = onNext;
         this.onError = onError;
         this.onComplete = onComplete;
         this.wip = new AtomicInteger();
         this.queue = new SpscLinkedArrayQueue<>(16);
         this.csub = new CompositeSubscription();
         this.cdl = new CountDownLatch();
    }

    public void consumeAll() {
         if (csub.isUnsubscribed()) {
             cdl.countDown();
             return;
         }
         CompletionStage<T> stage = enumerator.moveNext(csub);
         queue.offer(stage);
         if (wip.getAndIncrement() == 0) {
             do {
                 stage = queue.poll();
                 stage.whenComplete((b, e) -> {
                     if (csub.isUnsubscribed()) {
                         cdl.countDown();
                         return;
                     } else
                     if (e != null) {
                         onError.accept(e);
                         cdl.countDown();
                     } else
                     if (b) {
                         onNext.accept(enumerator.current());
                         consumeAll();
                     } else {
                         onComplete.run();
                         cdl.countDown();
                     }
                 });
             } while (wip.decrementAndGet() != 0);
         }
    }

    public void await() throws InterruptedException {
        cdl.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return cdl.await(timeout, unit);
    }

    @Override
    public void unsubscribe() {
         csub.unsubscribe();
    }

    @Override
    public boolean isUnsubscribed() {
         return csub.isUnsubscribed();
    }
}

Looks intriguing. Apart from having the callbacks for the signal types and the enumerator instance, we need the work-in-progress wip counter and a queue just like with our typical queue-drain approach back in reactive-streams land. For cancellation, we have the CompositeSubscription and for blocking waits, we have a CountDownLatch. The consumeAll() method moves the enumerator one element forward and the trampoline loop makes sure there is only one whenComplete() active at a time. Inside the handler, we call the appropriate functional interface and in case of a value signal, we call the consumeAll recursively. The trampolining will make sure we don't get reentrant behavior whether or not consumeAll is called synchronously or asynchronously.


Writing an IAsyncEnumerable source


Of course, we need some data source to work with. Perhaps the most basic and most standard source is the range() operator in all reactive libraries - range is the counted for-loop of the reactive world.


public final class AsyncRange implements IAsyncEnumerable<Integer> {
    final int start;
    final int count;

    @Override
    public IAsyncEnumerator<Integer> enumerator() {
        return new AsyncRangeEnumerator(start, count);
    }

    static final class AsyncRangeEnumerator implements IAsyncEnumerator<Integer> {
        final long end;
        long index;

        static final CompletionStage<Boolean> TRUE = 
            CompletableFuture.completedFuture(true);

        static final CompletionStage<Boolean> FALSE = 
            CompletableFuture.completedFuture(false);

        public AsyncRangeEnumerator(int start, int count) {
            this.index = start - 1;
            this.end = (long)start + count;
        }

        @Override
        public CompletionStage<Boolean> moveNext(CompositeSubscription csub) {
            long i = index + 1;
            if (i == end) {
                return FALSE;
            }
            index = i;
            return TRUE;
        }

        @Override
        public Integer current() {
            return index;
        }
    }
}

The operation itself is pretty synchronous. The way I understand, CompletableFuture is like an AsyncSubject and because we only return a constant true or false stage, we can use a shared and completed instance (which should be stateless and non-interfering). Because moveNext() is called before current() we start the index from start - 1 and increment it by one in the moveNext(). If it is equal to the end, we return the false stage. If it hasn't reached end yet, we update the index field and return the true stage.


Going asynchronous


Of course, we are here for the asynchronous possibility, therefore, let's write an observeOn and subscribeOn operators. The first one makes sure default continuations on the CompletionStage<Boolean> happens on a specific thread whereas the second will make sure the actual call to moveNext() happens on the specific thread (so you can do blocking IO in moveNext() or in enumerator()). Plus, we are proficient in writing these operators, aren't we?

observeOn

If you have some unfamiliar operator to implement, the best advice I got from Erik Meijer's Channel 9 videos is: follow the types. We know we have an upstream source and some source of asynchrony. For simplicity, let's use Executor since CompletionStage XXXAsync methods take them verbatim.


public final class AsyncObserveOn<T> implements IAsyncEnumerable<T> {
    final IAsyncEnumerable<T> source;

    final Executor executor;

    public AsyncObserveOn(IAsyncEnumerable<T> source, Executor executor) {
        this.source = source;
        this.executor = executor;
    }

    @Override
    public IAsyncEnumerator<T> enumerator() {
        return new AsyncObserveOnEnumerator<>(source.enumerator(), executor);
    }

    // ... 
}

A very familiar pattern so far. The real work, hovewer, happens inside AsyncObserveOnEnumerator:


    static final class AsyncObserveOnEnumerator<T> implements IAsyncEnumerator<T> {

        final IAsyncEnumerator<T> enumerator;

        final Executor executor;

        public AsyncObserveOnEnumerator(IAsyncEnumerator<T> enumerator, Executor executor) {
            this.enumerator = enumerator;
            this.executor = executor;
        }
        
        @Override
        public CompletionStage<Boolean> moveNext(CompositeSubscription csub) {
            return enumerator.moveNext(csub).thenApplyAsync(v -> v, executor);
        }

        @Override
        public T current() {
            return enumerator.current();
        }
    }

I admit I'm not too familiar with CompletionStage so it appeared to me thenApplyAsync is the closest thing to have the value delivery moved to a specific executor. Otherwise, this looks like quite straightforward and much shorter than our Rx-style observeOn().

subscribeOn

Since we don't know IAsyncEnumerable won't have synchronous action in its moveNext() method, we need a way, via subscribeOn, to make sure moveNext() is called on some other thread:


public final class AsyncSubscribeOn<T> implements IAsyncEnumerable<T> {
    final IAsyncEnumerable<T> source;

    final Executor executor;

    public AsyncSubscribeOn(IAsyncEnumerable<T> source, Executor executor) {
        this.source = source;
        this.executor = executor;
    }

    @Override
    public IAsyncEnumerator<T> enumerator() {
        AxSubscribeOnEnumerator enumerator = new AxSubscribeOnEnumerator<>(executor);
        executor.execute(() -> {
            IAsyncEnumerator<T> ae = source.enumerator();
            enumerator.setEnumerator(ae);
        });
        return enumerator;
    }

    // ... 
}

Instead of directly calling source.enumerator() we offload it to the executor, which when executes will do the call with a valid IAsyncEnumerator - which is now deferred from the consumer's perspective - but still have to return ans IAsyncEnumerator ourselves. The difficulty is now how to allow calling moveNext() when we don't have the upstream's enumerator yet. Luckily, CompletionStage will come to our rescue:


    static final class AxSubscribeOnEnumerator<T> implements IAsyncEnumerator<T> {

        final Executor executor;
        
        final CompletableFuture<IAsyncEnumerator<T>> onEnumerator;
        
        public AxSubscribeOnEnumerator(Executor executor) {
            this.executor = executor;
            this.onEnumerator = new CompletableFuture<>();
        }
        
        void setEnumerator(IAsyncEnumerator<T> enumerator) {
            onEnumerator.complete(enumerator); 
        }
        
        @Override
        public CompletionStage<Boolean> moveNext(CompositeSubscription token) {
            return onEnumerator.thenComposeAsync(ae -> ae.moveNext(token), executor);
        }

        @Override
        public T current() {
            IAsyncEnumerator<T> ae = onEnumerator.getNow(null);
            return ae != null ? ae.current() : null;
        }
        
    }

We setup an onEnumerator CompletableFuture which will be completed via setEnumerator upon receiving the actual upstream IAsyncEnumerator. The big trick is how we use composition over this deferred onEnumerator value to call moveNext() on the upstream's enumerator once available on the executor. The operator thenComposeAsync is basically flatMap. The method current() needs some extra logic, we try to get the upstream's enumerator and if it's not yet available, we simply return null - shouldn't call current() without the corresponding CompletionStage firing anyway.


Benchmark


Now that we have the three most basic operator's available, let's benchmark our IAsyncEnumerable implementations against the cutting edge equivalent in Reactive-Streams-Commons. For the source code, please refer to the benchmark implementation in my repository. For convenience, I've implemented the operators above in a fluent way where the base type is Ax - Async Extensions.

Results of the throughput benchmark (bigger is better): (i7 4790, Windows 7 x64, Java 8u92)


The benchmark range is just the basic range(1, count), the rangeAsync is a range(1, count).observeOn(executor) and the rangePipeline is range(1, count).subscribeOn(executor1).observeOn(executor2). In the columns, ax is my implementation of IAsyncEnumerable, px is the Reactive-Streams-Commons (Rsc) Publisher Extensions fluent API entry point. Since Rsc uses operator fusion, rangeAsync() is run with and without operator fusion enabled (the others don't fuse in Rsc), the latter is in the pxf column.

Evaluation

Looks like Rsc outperforms the IAsyncEnumerable implementation considerably, both in synchronous and asynchronous use. Without an independent library to compare against, I can only speculate why IAsyncEnumerable has so much overhead. Naturally, my limited experience with CompletionStage could explain some of it, but I doubt that's the main reason. Since both libraries use the same single-threaded Executor in the benchmark, we can rule out the executor overhead itself.

What remains is the architectural and conceptual differences:


  • We have possibly one allocation of the CompletionStage plus a known continuation stage per value - Rsc doesn't allocate anything
  • CompletionStage is actually between hot and cold and acts like an AsyncSubject, when one attaches the continuation to it, it could be still running or already completed - determining this and acting accordingly adds overhead - Rsc calls onNext as directly as possible
  • The longer the pipeline the more temporary CompletionStages get involved, which means allocation and individual task scheduling - Rsc exploits the emergent batching property of the streams over an async boundary.


Conclusion

I believe what we have here as IAsyncEnumerable is a corner case of the reactive-flow approach when one has basically request(1) at each stage plus some allocation overhead, making the approach more overhead than the highly optimized flow approach.

It certainly looks simpler and implements shorter operators, but I have to ask, what's the benefit over the Reactive-Streams approach?

If somebody have some tips for optimizing our IAsyncEnumerable implementation or can suggest me an independent implementation, I'd be glad to benchmark and compare it and re-evaluate my position on the topic!