Introduction
I've recently came across an interesting presentation from EclipseCon titled Asynchronous Event Streams – when java.util.stream met org.osgi.util.promise! (video, specification). It appears the OSGi folks want to solve the async problem as well and came up with an API to do that. It promises (in)finite stream processing, error handling and backpressure. As it turns out, nothing is ever simple with this kind of problem so let's see what's going on.The Interfaces
Unfortunately, the code behind the Asynchronous Event Streams (AsyncES) doesn't seem to exist beyond the text file I linked up there so let's extract them from the documentation instead.As with Reactive-Streams (RS), AsyncES, consists of a few basic interfaces representing a source and a consumer.
We have a PushStream and a PushStreamFactory interfaces (omitted) which contain our familiar map, flatMap, etc. methods and PushStreams factories respectively. Strangely, PushStream extends Closeable and AutoCloseable which is odd to me. Since there is nothing in the spec indicating a hot source, I'd assume PushStreams are always cold and closing them makes no sense.
The core interface is the PushEventSource:
@FunctionalInterface public interface PushEventSource<T> { Closeable open( PushEventConsumer<? super T> consumer) throws Exception; }
If you remember IObservable/IObserver, the pattern should look familiar. The PushEventSource is the way of specifying a "generator" that will emit values of T to a consumer.
The method specification has two important differences compared to Publisher: the "connection" method returns a Closeable and may throw a checked Exception.
Unfortunately, this pattern has problems similar to what we had with RxJava in the early days:
- Given a synchronous source, it may be very hard to cancel the connection from within the consumer unless the Closeable is propagated around (more on this later).
- The method throws a checked exception which is generally inconvenient and also seems unnecessary because the exception could be delivered to the consumer.
In RS, it is the choice of the consumer whether or not to expose cancellation and the cancellation itself works for synchronous and asynchronous sources alike.
@FunctionalInterface public interface PushEventConsumer<T> { long ABORT = -1L; long CONTINUE = 0L; long accept(PushEvent<T> e) throws Exception; }
Next comes the consumer: PushEventConsumer. At first glance, we don't have per event-type methods just an accept method that takes a PushEvent, may throw an Exception and returns some value.
Having only a single consumer method should remind us of Notification objects with RxJava and indeed, PushEvent is a interface indicating such a container Object.
The second interesting thing is that this method may throw an Exception. It is sometimes convenient lambdas in an IO-related stream can throw directly without wrapping, but PushEventConsumer is an end consumer and not a source/intermediate operator; where would this Exception propagate, especially in an async environment?
The most interesting part is the return type itself. The specification indicates this is the source of backpressure and cancellation support:
- Negative values are considered indication of cancellation or abort.
- Zero indicates the sender can call the method immediately with new data.
- Positive value indicates the sender should wait the specified amount in milliseconds before sending any new data.
In RS, the backpressure works via the concept of co-routines. Unless the downstream requests anything, no value will be emitted/generated; the consumer may take any time to process what it requested and issue more requests at its own convenience.
However, AsyncES returns a delay value after which the source can emit the next value. But how does one know how long the accept call would take? In addition, if the upstream is synchronous, does this mean that a positive return value should trigger a Thread.sleep()? Probably not. Without a reference implementation, I can only speculate that value generation uses some form of recursive scheduling via an Executor: the source schedules an emission of a PushEvent immediately once the consumer arrives. In the Runnable, the accept is run and based on the return value, a new Runnable is submitted with a delay this time.
This approach and the interface itself have several drawbacks:
- The delay amount is a fixed unit (of milliseconds).
- If the Executor approach is used, it means for each source value, we have a wrapper PushEvent and a Runnable (which is quite an overhead).
- Since the ABORT value is returned synchronously, the cancellation doesn't compose. If we'd implement a library based on this structure and our RxJava lift() approach (in which consumers call other consumers' methods), the moment the accept() call of the downstream consumer has to be called on a different thread, there is no way to get back its result to the original thread:
ExecutorService exec = ... PushEventOperator<T, T> goAsync = downstreamConsumer -> { return e -> { exec.execute(() -> { long backpressure = downstreamConsumer.accept(e); }); return backpressure; // ??? }; };
Even if we use AtomicLong, the execution happens elsewhere and unless the upstream emits a value, there is no way to return the downstream's cancellation indicator, or in fact, its backpressure-delay requirement.
- The final problem is that what if the given delay amount is not enough for the consumer? This model indicates the event will be sent regardless. There are only a few things the consumer can do: accept the value and return a larger delay, abort and crash with an overflow error or do unbounded buffering.
I believe these problems are concerning at least. It is no accident RxJava and Reactive-Streams look like as they are today: finding a workable, composable asynchronous data delivery approach is non-trivial task at best.
However, since this blog is about RxJava, why don't we try to build a bridge between an AsyncES PushEventSource and an Observable?
RxJava - AsyncES bridge
Let's start with a bridge that given an Observable (or in fact, any RS Publisher) and turns it into a PushEventSource:public static <T> PushEventSource<T> from( Publisher<? extends T> publisher, Scheduler scheduler) { return c -> { // implement }; }
Now let's see the missing implementation, chunk by chunk:
CompositeDisposable cd = new CompositeDisposable(); Scheduler.Worker w = scheduler.createWorker(); cd.add(w);
We create the usual CompositeDisposable and a Worker and bind them together to form the basis of our cancellation support.
publisher.subscribe(new Subscriber<T>() { Subscription s; @Override public void onSubscribe(Subscription s) { // implement } @Override public void onNext(T t) { // implement } @Override public void onError(Throwable t) { // implement } @Override public void onComplete() { // implement } });
Next, we subscribe to the Publisher with a Subscriber and we will translate its events, but before that, the open method we are implementing in here via a lambda has to return a Closeable:
return cd::dispose;
So far, nothing special in the structure: lambdas and functional interfaces just by "following the types". Next, let's see the first event in a reactive-stream:
@Override public void onSubscribe(Subscription s) { this.s = s; cd.add(s::cancel); if (!cd.isDisposed()) { s.request(1); } }
We save the Subscription first, add it to the CompositeDisposable for mass-cancellation support and unless it is disposed, we request exactly 1 element from upstream. But why? The reason for this is that we don't really know how many elements the PushEventConsumer wants only the pacing between elements. This way, the safest way is to request one upfront and request more after the specified delay returned by the accept() method:
@Override public void onNext(T t) { long backpressure; try { backpressure = c.accept(PushEvent.data(t)); // (1) } catch (Exception e) { onError(e); // (2) return; } if (backpressure <= PushEventConsumer.ABORT) { // (3) cd.dispose(); } else if (backpressure == PushEventConsumer.CONTINUE) { // (4) s.request(1); } else { w.schedule(() -> s.request(1), // (5) backpressure, TimeUnit.MILLISECONDS); } }
- We call the accept method and wrap the value into an instance of the aforementioned PushEvent (not detailed here, think of a rx.Notification).
- In case the accept method throws, we call onError with the exception.
- Based on the backpressure result being an ABORT value, we dispose the composite and thus cancel the Subscription and dispose the Worker.
- If the backpressure result is CONTINUE (0), we can synchronously request one more element.
- Otherwise, we have to schedule the request of a single element after the given number of milliseconds.
The onError and onComplete events are pretty simple: dispose the composite and call the accept method with a wrapped error/close indicator:
@Override public void onError(Throwable t) { cd.dispose(); try { c.accept(PushEvent.error(t)); } catch (Exception ex) { RxJavaPlugins.onError(ex); } } @Override public void onComplete() { cd.dispose(); try { c.accept(PushEvent.close()); } catch (Exception ex) { RxJavaPlugins.onError(ex); } }
And that's it for the Publisher -> PushEventSource conversion. In fact, there is no problem with this direction (apart from the slight deficiency due to wrapping values and requesting by 1).
All we need now is the reverse conversion PushEventSource -> Publisher. Let's start with the method:
All we need now is the reverse conversion PushEventSource -> Publisher. Let's start with the method:
public static <T> Observable<T> to( PushEventSource<? extends T> source, long backpressure) { return Observable.create(s -> { // implement }); }
It appears the dual of Scheduler is a backpressure amount in milliseconds. As I mentioned above, there is no clear way of communicating how much the upstream should wait until the downstream can process the next element, or more specifically, how much time to wait before the Subscriber issues a new request().
Now let's see the implementation, part by part:
CompositeDisposable cd = new CompositeDisposable(); AtomicLong requested = new AtomicLong(); s.onSubscribe(new Subscription() { @Override public void request(long n) { BackpressureHelper.add(requested, n); } @Override public void cancel() { cd.dispose(); } });
First, we create a composite again and an AtomicLong to hold the downstream request amount. Next, we create a Subscription and wire up the request (accumulate requests via helper) and cancel (dispose the composite) methods.
Next we open a connection to the source:
try { Closeable c = source.open(new PushEventConsumer<T>() { @Override public long accept(PushEvent<T> c) throws Exception { // implement } }); cd.add(() -> { try { c.close(); } catch (IOException ex1) { RxJavaPlugins.onError(ex1); } }); } catch (Exception ex2) { s.onError(ex2); }
We create the consumer (detailed below) then add a Disposable to the composite that will call close on the returned Closeable by the open method. If the open method crashes, we simply emit its Exception to the Subscriber.
Finally, let's implement the accept method which should dispatch the PushEvents:
if (cd.isDisposed()) { // (1) return ABORT; } switch (c.getType()) { // (2) case DATA: s.onNext(c.getData()); for (;;) { long r = requested.get(); // (3) if (r == 0) { return backpressure; // (4) } if (requested.compareAndSet(r, r - 1)) { return r > 0L ? 0 : backpressure; // (5) } } case ERROR: // (6) cd.dispose(); s.onError(c.getFailure()); return ABORT; case CLOSE: cd.dispose(); s.onComplete(); return ABORT; } return 0L;
- In case the composite is disposed (i.e., downstream cancellation), we return the ABORT value which should terminate the sequence. Note that this can only happen if the upstream actually sends an event, not sooner.
- We dispatch on the PushEvent type and if its a DATA element, we emit the contained data value.
- Once the value has been emitted, we have to decrement the requested counter in a CAS loop.
- If the counter is already zero, return with the backpressure value.
- Otherwise use a CAS to decrement by one and return the backpressure if we reached zero this way or zero to indicate the upstream can immediately send a new value.
- For the failure and close events, we call the appropriate onXXX method and dispose the composite.
There is an unavoidable problem with this implementation: what if the backpressure amount is not enough? This implementation just writes through for simplicity so you have to apply onBackpressureXXX policies on the created Observable. Implementing a version with the BackpressureStrategy mentioned in an earlier post is left to the reader.
Now that we have the conversion methods, it should be relatively straightforward to implement PushStream's methods on top of RxJava 2.x (or any other Reactive-Streams compliant library).
You can find the example code, involving our favorite flatMap operator, on my gist page.
Conclusion
Designing async APIs is a non-trivial task at best and there has been many attempts to do so, the latest of it being the Asynchronous Event Streams RFC of OSGi.
As we saw, the underlying idea of using time to excert backpressure can be easily emulated by Reactive-Streams APIs (especially RxJava) so technically, if one wants such backpressure scheme, he/she can go ahead with the Reactive-Streams compliant libraries with ease. The opposite direction requires more consideration and overflow management.
With this exercise, I have doubts the Asynchronous Event Streams will work out in its current form.