2015. június 13., szombat

The Reactive-Streams API (part 1)

Introduction


The Reactive-Streams API, heavily influenced by RxJava, is (in my opinion) the most beautiful API describing (a)synchronous, cancellable and backpressure-able dataflows to date, replacing the original and limited IObservable/IObserver approach invented by Erik Meijer.

Back in April, I spent a week and tried porting RxJava 1.x over this new API. At first, the biggest hurdle was the lack of resource-management of the new Subscriber interface and how RxJava's Producer and Subscription interfaces were merged into a single Subscription interface. However, once I started implementing various infrastructure supporting classes, I started appreciating the new design, and perhaps the most important outcome is how the entire backpressure awareness just clicked into the right place in my mind.

With this new insight, I started looking at RxJava's operators and Producer implementations and I was suddenly able to discover bugs and optimization possibilities quite easily, not to mention, start a blog series about how things (should) work in RxJava.

In this blog series, I'm going to talk about this new reactive-streams API and its relation to current RxJava 1.x constructs.

Package and naming confusions

Knowing RxJava in and out, perhaps the first stumbling block about reactive-streams (RS) is how concepts are named differently.

rx.Observable is called org.reactivestreams.Publisher and is now an interface. Since Java doesn't and probably won't support extension methods, having a Publisher interface returned as a source doesn't really help with composing operators over it. Luckily, the current RxJava fluent approach can still remain as is by extending this new interface.

Unsubscription through the rx.Subscription and backpressure through the rx.Producer interfaces are now somewhat unified into a single interface, named org.reactivestreams.Subscription. The new interface uses cancel() instead of unsubscribe() and doesn't have (nor really needs) the means to check for a isUnsubscribed() state. Since RS doesn't specify any means of resource management, one will need to constantly wrap the new cancel() method into a resource management instance to be able to use it with the containers. Due to this name confusion, RxJava 2.0 will most likely change the resource management classes and interfaces into the C#-esque Disposables.

There is no simple rx.Observer equivalent, but the rx.Subscriber looks somewhat similar to org.reactivestreams.Subscriber. Instead of it being unsubscribable directly, the new Subscriber receives a 'control' object that lets it cancel the connection and request more values. In addition, instead of having two methods, onStart() and setProducer() - which pose quite a confusion about how one should jump-start a Subscriber in RxJava -, there is only a single onSubscribe() method taking an RS Subscription. The former calls to the protected rx.Subscriber.request(n) now have to store this RS Subscription instance and call rs.Subscription.request(n) on it.

There were some debates about how the completion signal's method should be called and was settled on onComplete() instead of RxJava's and Rx.NET's onCompleted() name. Hopefully, our IDE's auto-complete feature will save us in this regard.

Finally, RS specifies a combined interface of rs.Publisher and rs.Subscriber by the name of Processor, which resembles to the Subjects API inside RxJava. Since this Processor is an interface, the only change to rx.Subject will be to implement it.


Java 9+ Flow

There are some clashes between RxJava and RS, but you might have heard Doug Lea wants to include the reactive-streams idiom in Java 9 under container object java.util.concurrent.Flow. The naming matches RS but the packages are different. I was and I'm still skeptical about this move:
  • This "common platform" won't be that common: only Java 9+ users will benefit from it. Having RS, which is Java 6+ and thus Android-friendly, is a better option; most projects already include many other libraries and plus 2 (RS + RxJava 2.0) is not an issue.
  • Java 9 won't use it internally at all. No new async NIO, networking, File I/O and UI events based on the Observer++ pattern seems to be planned. Not to mention, without fluent API support, most users can't go zero-dependency with it. The situation can be remedied if Flow is backported to previos Java JDKs (with the same package location) in an update. Still, without extension methods in Java, users have to always rely on extra libraries and instance wrappings.
  • I'm not sure the whole reactive paradigm is final in terms of capturing properties: for example, Applied Duality's AsyncObservable capturing the latency of onXXX methods might be some day incorporated in RS 2.0 (although I don't really see the need for it). Swapping out the dependencies to RS 2.0 and RxJava 3.0 might be much simpler than changing JDK's Flow API. On a historical note, Rx.NET got its IObservable/IObserver included in the BCL, and I think they painted themselves into a corner and in order them to join the RS-paradigm, they now have to figure out a way to have the new set of interface structures live alongside the old one, similar to how pre-generics and post-generics collections ended up. Note that I assume here they accept RS as the new basis for Rx.NET 3.0 and don't try to do something more complicated.
 

 Critique of RS

I'm completely satisfied with the four new interfaces of RS, but less so with the textual specification about them. I believe it is over-restrictive and prevents some sensible API implementations.

No exceptions other than NullPointerException

Although methods are allowed to throw NPE if parameters of various methods are null, however, the components can end up in invalid state or the Subscription.request() could be called with an invalid non-positive value. The current RS 1.0 specification doesn't allow handing these cases the way most Java APIs do by throwing IllegalArgumentException or IllegalStateException.

One can argue that such exceptions should be sent through the onError() method, but I'll show potential situation that makes this impossible if adhering to the specification.

Let's assume one has a Subscriber and accidentally subscribes it to two different Publishers. By definition, this is not allowed and the onSubscribe() method should reject a second Subscription:

    // ...
    Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        if (subscription != null) {
            s.cancel();
            onError(new IllegalStateException("§x.y: ..."));
            return;
        }
        this.subscription = s;
        s.request(Long.MAX_VALUE);
    }

This may appear to work, but the moment the first Publisher goes async, two problems arise:
  • The second onSubscribe() call's onError() is now, potentially, concurrently invoked with the first Publisher's onNext() call, which is correctly forbidden by the specification.
  • Storing the first subscription can't be plain and requires an AtomicReference in case there is a subscription race due to multiple uses, adding overhead to a Subscriber implementation.
Therefore, the only safe way to implement onSubscribe() could be something like this:

    // ...
    final AtomicReference subscription = ...;
    @Override
    public void onSubscribe(Subscription s) {
        if (!subscription.compareAndSet(null, s)) {
            s.cancel();
            new IllegalStateException("§x.y: ...").printStackTrace();
            return;
        }
        s.request(Long.MAX_VALUE);
    }

and hope the standard out or log is monitored pr one needs to serialize out the subscriber's onXXX() methods all the time.

Instead of this, I'd simply throw the IllegalArgumentException and make the subscribe() fail.

Illegal request amount

Requests through the Subscriptions should be positive, otherwise, the subscriber should be notified via onError().

    // ...
    @Override
    public void request(long n) {
        if (n <= 0) {
            subscriber.onError(new IllegalArgumentException("§x.y: ..."));
            return;
        }
        // ...
    }


Again, the problem comes when request() is invoked asynchronously, for example, due to an observeOn() trying to replenish its input queue while the Subscription is still producing values through onNext(). The onError() is now racing with onNext() again, which is forbidden.
The resolution and my suggestions are similar to the IllegalStateException: one can print/log the error or instead throw it back at the caller.


The request() can't throw NPE

Even though various onXXX() methods can at least throw a NullPointerException, the Subscription.request() can't. One has to try and bounce such exception back to the Subscriber and if that fails as well, there is nowhere it could go but to standard out or log.


Processors require Subscriptions

The RS is heavily biased towards cold observable sequences where having a Subscription is a correct interoperation element. However, hot Observables, such as RxJava Subjects may or may not receive Subscriptions before their onNext() is called.

For example, if one uses a PublishSubject to multicast an Observable by calling Observable.subscribe(PublishSubject), the PublishSubject will receive a Subscription. However, if one uses PublishSubject to multicast mouse movement events, there is no way it can be cancelled (or backpressured) and thus having a Subscription there is unnecessary.

Therefore, one either has to use some dummy Subscription or, as I see, make the call to onSubscribe() optional for Processors.


Processors have to support backpressure

Backpressure support might be impossible and infeasible with Processors: mouse moves can't be backpressured, therefore, a Subject would need to buffer and/or drop overflown values. Implementing it for AsyncSubject is trivial, but other Subject types have to be turned into some kind of hybrid ReplaySubject. (Naturally, PublishSubject can by default drop unrequested values for its subscribers as if they weren't there, but the continuous value delivery guarantee of Subjects is too valuable to be subverted in my opinion.)

A conforming implementation would impose quite an overhead for all subscribers by preventing 'write-through' and potentially lock-stepping them and slowing the processing down to the slowest requestor.

Instead, I'd weaken the related specification points and allow optional backpressure support for Processors.


Subscriptions will be conservative regardless

The specification suggests that Subscriptions can be implemented without thinking about concurrency too often. If you remember about the requirements of Producers in RxJava, I mentioned the requirement of thread-safety and reentrant-safety.

Reentrant-safety will still be critical part of the Subscription.request() implementation to avoid unnecessary recursion and to handle the case when the onNext() calls request() before it processes the current value which would trigger another (potentially) recursive call to onNext() ad infinitum.

The need for thread-safety can be 'proven' by the behavior of the following chain. Let's assume a chain is subscribed on the main thread, outside any RxJava schedulers and starts producing values through an observeOn() operator. The consumer of the sequence is then periodically requesting more data which request() call is forwarded to the source still emitting from the main thread. At this point, two threads are in or enter the request() method and without proper atomics/synchronization, the reentrancy check and proper request accounting wouldn't work. Now one would think the observeOn() could serialize access to its Subscription (the main source) but since the source runs outside the Schedulers, such serialization would be no-op from the downstream's perspective and ineffective from the upstream's perspective.

Since the source Subscriber can't possibly know the call pattern to its request() method, it has to be conservative and use one of the approaches I blogged about in respect to Producers. Naturally, this also implies that the cancel() method has to touch a volatile variable in any implementations.

Conclusion

These concerns didn't just arise while writing this post, but I've already tried to notify the RS designers about them. Unfortunately, my concerns didn't got through and my code didn't win arguments. We could argue about my issues endlessly, but instead, I'd rather write code and prove my points there.

Regardless, I find RS to be a good basis to build upon with some slight phylosophical adjustments:
  • Error handling: to paraphrase the latest Godzilla movie: "Let them throw!".
  • Processors have the option to do backpressure and cancellation on their Subscriber side.
  • Most non-trivial Subscriptions have to be conservative and made threadsafe.
In the next post, I'll start with the two most basic Producers of RxJava, namely the SingleProducer and SingleDelayedProducer and show how they can be made into an RS SingleSubscription and SingleDelayedSubscription.

4 megjegyzés:

  1. I am so appreciate to you for these acticles

    VálaszTörlés
  2. Here is an experimental untested backport of the Java 9 Flow / SubmissionPublisher:

    https://sourceforge.net/p/streamsupport/code/ci/default/tree/src/experimental/java/java8/util/concurrent/

    VálaszTörlés
  3. Thanks. I've been thinking about a benchmark that compares SubmissionPublisher with some RxJava constructs.

    VálaszTörlés
  4. Well, the backport (if it works at all) would certainly be slower than the real Java 9 thing. So, a performance comparison might be not that meaningful.

    But anyway, let me know when you want a pre-built jar file with all dependencies.

    VálaszTörlés