Introduction
Backpressure is essential if one wants to avoid buffer bloat and excessive memory usage if two stages in a reactive pipeline consume events with different speed. RxJava and Reactive-Streams developed a non-blocking, request-coordinating protocol to solve this problem, but you may have heard there are alternatives to it. One alternative that comes up from time to time is Async Iterables (Java terminology) or Async Enumerables (C# terminology).
In fact Rx.NET has an Ix.NET (stands for Interactive Extensions) sub-project in which there is the Async Enumerables library. It solves this backpressure problem by having a Task (~ CompletableFuture, ~ Promise) returned from its MoveNext() (~ hasNext()) method and when that Task fires, you can consume the Current property (~ next() method). The backpressure behavior comes from the fact that you'd call MoveNext() again only after you processed the the current element.
Unfortunately, I haven't found a Java implementation for the IAsyncEnumerable (haven't really looked beyond a few Google searches), so I decided I'll implement it on my own in Java 8, see what it takes to get data across with it and how performant is it compared to my current cutting-edge understanding of reactive-flows: the Reactive-Streams-Commons library.
Base API
Since Async Enumerables are designed in deferred execution in mind, the base API consists of two interfaces:
interface IAsyncEnumerable<T> { IAsyncEnumerator<T> enumerator(); } interface IAsyncEnumerator<T> { CompletionStage<Boolean> moveNext(CompositeSubscription cancel); T current(); }
The IAsyncEnumerable is the equivalent of Iterable and it hands out IAsyncEnumerators. IAsyncEnumerator has a moveNext method which returns a CompletionStage indicating if there is value available via current() (signals true) or the sequence ended (signals false). C# CancellationToken looks like our CompositeSubscription so I'm reusing it as the way for cancellation.
(Sidenote: I'm not sure how cancellation composes yet, the original Ix.NET IAsyncEnumerator is an IDisposable plus their Task can also be disposed, unlike CompletionStage. Luckily, I don't need this feature too extensively in this post.)
Consuming an IAsyncEnumerable
Consuming such IAsyncEnumerator is straightforward, although not as easy without C# async/await. If we are only interested in exactly one value, we can write:
IAsyncEnumerable<T> source = ... IAsyncEnumerator<T> enumerator = source.enumerator(); enumerator.moveNext(new CompositeSubscription()) .whenComplete((b, e) -> { if (e != null) { e.printStackTrace(); } else if (b) { System.out.println(enumerator.current()); } else { System.out.println("Empty!"); } });
Of course, given the CompletionStage API, you are free to process the single result as you see fit.
Consuming more than one value from an IAsyncEnumerator is more involved. You have to recursively call moveNext until it errors or completes:
public void consumeAll(IAsyncEnumerator<T> enumerator, CompositeSubscription csub) { if (csub == null) { csub = new CompositeSubscription(); } CompositeSubscription fcsub = csub; enumerator.moveNext(new CompositeSubscription()) .whenComplete((b, e) -> { if (e != null) { e.printStackTrace(); } else if (b) { System.out.println(enumerator.current()); // go recursive consumeAll(enumerator, fcsub); } else { System.out.println("Empty!"); } }); }
Unfortunately, there is a slight problem: if the CompletionStage is a synchronous stage, you may end up with StackOverflowError because of the recursive call to consumeAll. Therefore, to be sure, we have to trampoline the call to consumeAll to ensure the stack dept doesn't grow too large:
public final class AsyncConsumer<T> implements Subscription { final Consumer<? super T> onNext; final Consumer<Throwable> onError; final Runnable onComplete; final IAsyncEnumerator<T> enumerator; final AtomicInteger wip; final Queue<CompletionStage<Boolean>> queue; final CompositeSubscription csub; final CountDownLatch cdl; public AsyncConsumer( IAsyncEnumerator<T> enumerator, Consumer<? super T> onNext, Consumer<Throwable> onError, Runnable onComplete ) { this.enumerator = enumerator; this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; this.wip = new AtomicInteger(); this.queue = new SpscLinkedArrayQueue<>(16); this.csub = new CompositeSubscription(); this.cdl = new CountDownLatch(); } public void consumeAll() { if (csub.isUnsubscribed()) { cdl.countDown(); return; } CompletionStage<T> stage = enumerator.moveNext(csub); queue.offer(stage); if (wip.getAndIncrement() == 0) { do { stage = queue.poll(); stage.whenComplete((b, e) -> { if (csub.isUnsubscribed()) { cdl.countDown(); return; } else if (e != null) { onError.accept(e); cdl.countDown(); } else if (b) { onNext.accept(enumerator.current()); consumeAll(); } else { onComplete.run(); cdl.countDown(); } }); } while (wip.decrementAndGet() != 0); } } public void await() throws InterruptedException { cdl.await(); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return cdl.await(timeout, unit); } @Override public void unsubscribe() { csub.unsubscribe(); } @Override public boolean isUnsubscribed() { return csub.isUnsubscribed(); } }
Looks intriguing. Apart from having the callbacks for the signal types and the enumerator instance, we need the work-in-progress wip counter and a queue just like with our typical queue-drain approach back in reactive-streams land. For cancellation, we have the CompositeSubscription and for blocking waits, we have a CountDownLatch. The consumeAll() method moves the enumerator one element forward and the trampoline loop makes sure there is only one whenComplete() active at a time. Inside the handler, we call the appropriate functional interface and in case of a value signal, we call the consumeAll recursively. The trampolining will make sure we don't get reentrant behavior whether or not consumeAll is called synchronously or asynchronously.
Writing an IAsyncEnumerable source
Of course, we need some data source to work with. Perhaps the most basic and most standard source is the range() operator in all reactive libraries - range is the counted for-loop of the reactive world.
public final class AsyncRange implements IAsyncEnumerable<Integer> { final int start; final int count; @Override public IAsyncEnumerator<Integer> enumerator() { return new AsyncRangeEnumerator(start, count); } static final class AsyncRangeEnumerator implements IAsyncEnumerator<Integer> { final long end; long index; static final CompletionStage<Boolean> TRUE = CompletableFuture.completedFuture(true); static final CompletionStage<Boolean> FALSE = CompletableFuture.completedFuture(false); public AsyncRangeEnumerator(int start, int count) { this.index = start - 1; this.end = (long)start + count; } @Override public CompletionStage<Boolean> moveNext(CompositeSubscription csub) { long i = index + 1; if (i == end) { return FALSE; } index = i; return TRUE; } @Override public Integer current() { return index; } } }
The operation itself is pretty synchronous. The way I understand, CompletableFuture is like an AsyncSubject and because we only return a constant true or false stage, we can use a shared and completed instance (which should be stateless and non-interfering). Because moveNext() is called before current() we start the index from start - 1 and increment it by one in the moveNext(). If it is equal to the end, we return the false stage. If it hasn't reached end yet, we update the index field and return the true stage.
Going asynchronous
Of course, we are here for the asynchronous possibility, therefore, let's write an observeOn and subscribeOn operators. The first one makes sure default continuations on the CompletionStage<Boolean> happens on a specific thread whereas the second will make sure the actual call to moveNext() happens on the specific thread (so you can do blocking IO in moveNext() or in enumerator()). Plus, we are proficient in writing these operators, aren't we?
observeOn
If you have some unfamiliar operator to implement, the best advice I got from Erik Meijer's Channel 9 videos is: follow the types. We know we have an upstream source and some source of asynchrony. For simplicity, let's use Executor since CompletionStage XXXAsync methods take them verbatim.public final class AsyncObserveOn<T> implements IAsyncEnumerable<T> { final IAsyncEnumerable<T> source; final Executor executor; public AsyncObserveOn(IAsyncEnumerable<T> source, Executor executor) { this.source = source; this.executor = executor; } @Override public IAsyncEnumerator<T> enumerator() { return new AsyncObserveOnEnumerator<>(source.enumerator(), executor); } // ... }
A very familiar pattern so far. The real work, hovewer, happens inside AsyncObserveOnEnumerator:
static final class AsyncObserveOnEnumerator<T> implements IAsyncEnumerator<T> { final IAsyncEnumerator<T> enumerator; final Executor executor; public AsyncObserveOnEnumerator(IAsyncEnumerator<T> enumerator, Executor executor) { this.enumerator = enumerator; this.executor = executor; } @Override public CompletionStage<Boolean> moveNext(CompositeSubscription csub) { return enumerator.moveNext(csub).thenApplyAsync(v -> v, executor); } @Override public T current() { return enumerator.current(); } }
I admit I'm not too familiar with CompletionStage so it appeared to me thenApplyAsync is the closest thing to have the value delivery moved to a specific executor. Otherwise, this looks like quite straightforward and much shorter than our Rx-style observeOn().
subscribeOn
Since we don't know IAsyncEnumerable won't have synchronous action in its moveNext() method, we need a way, via subscribeOn, to make sure moveNext() is called on some other thread:public final class AsyncSubscribeOn<T> implements IAsyncEnumerable<T> { final IAsyncEnumerable<T> source; final Executor executor; public AsyncSubscribeOn(IAsyncEnumerable<T> source, Executor executor) { this.source = source; this.executor = executor; } @Override public IAsyncEnumerator<T> enumerator() { AxSubscribeOnEnumeratorenumerator = new AxSubscribeOnEnumerator<>(executor); executor.execute(() -> { IAsyncEnumerator<T> ae = source.enumerator(); enumerator.setEnumerator(ae); }); return enumerator; } // ... }
Instead of directly calling source.enumerator() we offload it to the executor, which when executes will do the call with a valid IAsyncEnumerator - which is now deferred from the consumer's perspective - but still have to return ans IAsyncEnumerator ourselves. The difficulty is now how to allow calling moveNext() when we don't have the upstream's enumerator yet. Luckily, CompletionStage will come to our rescue:
static final class AxSubscribeOnEnumerator<T> implements IAsyncEnumerator<T> { final Executor executor; final CompletableFuture<IAsyncEnumerator<T>> onEnumerator; public AxSubscribeOnEnumerator(Executor executor) { this.executor = executor; this.onEnumerator = new CompletableFuture<>(); } void setEnumerator(IAsyncEnumerator<T> enumerator) { onEnumerator.complete(enumerator); } @Override public CompletionStage<Boolean> moveNext(CompositeSubscription token) { return onEnumerator.thenComposeAsync(ae -> ae.moveNext(token), executor); } @Override public T current() { IAsyncEnumerator<T> ae = onEnumerator.getNow(null); return ae != null ? ae.current() : null; } }
We setup an onEnumerator CompletableFuture which will be completed via setEnumerator upon receiving the actual upstream IAsyncEnumerator. The big trick is how we use composition over this deferred onEnumerator value to call moveNext() on the upstream's enumerator once available on the executor. The operator thenComposeAsync is basically flatMap. The method current() needs some extra logic, we try to get the upstream's enumerator and if it's not yet available, we simply return null - shouldn't call current() without the corresponding CompletionStage firing anyway.
Benchmark
Now that we have the three most basic operator's available, let's benchmark our IAsyncEnumerable implementations against the cutting edge equivalent in Reactive-Streams-Commons. For the source code, please refer to the benchmark implementation in my repository. For convenience, I've implemented the operators above in a fluent way where the base type is Ax - Async Extensions.
Results of the throughput benchmark (bigger is better): (i7 4790, Windows 7 x64, Java 8u92)
The benchmark range is just the basic range(1, count), the rangeAsync is a range(1, count).observeOn(executor) and the rangePipeline is range(1, count).subscribeOn(executor1).observeOn(executor2). In the columns, ax is my implementation of IAsyncEnumerable, px is the Reactive-Streams-Commons (Rsc) Publisher Extensions fluent API entry point. Since Rsc uses operator fusion, rangeAsync() is run with and without operator fusion enabled (the others don't fuse in Rsc), the latter is in the pxf column.
Evaluation
Looks like Rsc outperforms the IAsyncEnumerable implementation considerably, both in synchronous and asynchronous use. Without an independent library to compare against, I can only speculate why IAsyncEnumerable has so much overhead. Naturally, my limited experience with CompletionStage could explain some of it, but I doubt that's the main reason. Since both libraries use the same single-threaded Executor in the benchmark, we can rule out the executor overhead itself.What remains is the architectural and conceptual differences:
- We have possibly one allocation of the CompletionStage plus a known continuation stage per value - Rsc doesn't allocate anything
- CompletionStage is actually between hot and cold and acts like an AsyncSubject, when one attaches the continuation to it, it could be still running or already completed - determining this and acting accordingly adds overhead - Rsc calls onNext as directly as possible
- The longer the pipeline the more temporary CompletionStages get involved, which means allocation and individual task scheduling - Rsc exploits the emergent batching property of the streams over an async boundary.
Conclusion
I believe what we have here as IAsyncEnumerable is a corner case of the reactive-flow approach when one has basically request(1) at each stage plus some allocation overhead, making the approach more overhead than the highly optimized flow approach.It certainly looks simpler and implements shorter operators, but I have to ask, what's the benefit over the Reactive-Streams approach?
If somebody have some tips for optimizing our IAsyncEnumerable implementation or can suggest me an independent implementation, I'd be glad to benchmark and compare it and re-evaluate my position on the topic!