Intoduction
RxJava is now out more than 3 years and lived through several significant version changes. In this blog post, I'll point out design and implementation decisions that I personally think wasn't such a good idea.
Don't get me wrong, it doesn't mean that RxJava is bad or I knew all along how to do it "properly". It was a learning process for all of us involved, but the question is, can we learn from those mistakes and do it better in the next major version?
Synchronous unsubscription
In the early days, RxJava mirrored the architecture of Rx.NET which consisted of two important interfaces, IObservable and IObserver, derived through dualizing the IEnumerable and IEnumerator. (This was also true for my own library, Reactive4Java).
If we look at IObservable, we find the subscribe() method that returns an IDisposable. This returned object allows one to dispose or cancel a running sequence. However, it has a critical problem I demonstrate with a minimalistic reactive program:
interface IDisposable { void dispose(); } interface IObserver<T> { void onNext(T t); } interface IObservable<T> { IDisposable subscribe(IObserver<T> observer); } IObservable<Integer> source = o -> { for (int i = 0; i < Integer.MAX_VALUE; i++) { o.onNext(i); } return () -> { }; }; IDisposable d = o.subscribe(System.out::println); d.dispose();
If we run this code, it starts to print a lot of numbers to the console, despite we called dispose on the returned object by the subscribe method. What's wrong?
The problem is that the source observable can only return its IDisposable object only after the for-loop finishes, but then it has nothing to do. The whole setup is synchronous and thus this structure can't be reasonably cancelled.
Although Rx is good at async processing, many steps in a typical pipeline is synchronous and is affected by this synchronous cancellation requirement. Since Rx.NET is at least 3 years older than RxJava, how could this shortcoming still be in today's Rx.NET?
The example code above is just the well known range() operator and if we run a similar code in C#, we find that it doesn't print or stops printing almost immediately. The secret is that Rx.NET's range() operator runs on async scheduler by default, so that for loop runs on a different thread and the operator can immediately return with a meaningful IDisposable. Therefore, the synchronous cancellation issue is averted, but I wonder, was it a conscious or unconscious decision to sidestep the underlying problem? Who knows.
If you look at the source code of Rx.NET's range, you'll find something more complicated. It uses a recursive scheduling technique to deliver each value to the observer. When I measured it, it could only sustain 1M ops/second on the same machine which could do 250M ops/second with RxJava while delivering 1M elements.
Now RxJava's range() never used a scheduler thus the synchronous cancellation problem was discovered and mitigated by introducing the Subscriber class. An instance can be checked if it still wants events or not. The example above can be rewritten so the for loop checks its subscriber and quits accordingly.
Observable<Integer> source = Observable.create(s -> { for (int i = 0; i < Integer.MAX_VALUE && !s.isUnsubscribed(); i++) { o.onNext(i); } }); Subscription d = o.subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer v) { System.out.println(v); unsubscribe(); } // ... });
The Subscriber acts like a take(1) and unsubscribes itself after the first item. This unsubscribe() call internally sets a volatile boolean flag that is read by isUnsubscribed() and the loop above is stopped. Note, however, that you still can't unsubscribe via the Subscription returned by subscribe() because the lambda with the for loop doesn't exit until its terminal condition is met.
It doesn't seem to solve our initial problem that well, does it? Luckily, this new structure has the property that it can be cancelled before or while the loop is running, the latter done from another thread of course:
Subscriber<Integer> s = new Subscriber<Integer>() { @Override public void onNext(Integer v) { System.out.println(); } // ... } Schedulers.computation().schedule(s::unsubscribe, 1, TimeUnit.SECONDS); source.subscribe(s);
The fact that you can basically inject the cancellation support upfront, before anything is even subscribed to, allows proper propagation of unsubscription even with the most complicated operators.
In addition, there is a deeper implication of this new structure. In the new Observable, the lambda doesn't return anything, but Observable.subscribe() does still return a Subscription which is practically the same Subscriber sent in as the parameter. (Technically, it is a bit more involved process; see Jake Wharton's excellent video talk on the subject).
The insight: you can't be fully reactive if you return something. Returning something implies synchronous behavior and the method has to provide some result, even though it can't at that moment. This is when one is forced to block or sleep until the real logic can produce the relevant object to be returned. I showed this in my post about the OSGi Asynchronous Event Streams initiative.
Resources of the Subscriber
The Subscriber class offers the ability to associate resources with it in the form of Subscription instances. When the operator is cancelled (or terminates), these resources are unsubscribed as well.This is quite a convenience for operator developers, however, has its own cost: allocation.
Whenever a Subscriber is instantiated with the default constructor, an inner SubscriptionList instance is also always created, whether or not the Subscriber is likely to hold resources or not. In the previous example, range() doesn't need resources thus the SubscriptionList is never really used.
On one hand, there exist many operators that don't manage resources so creating the extra container is wasteful. On the other hand, many operators do use resources and expect this convenience to be present.
In addition, you may recall that Subscriber has a constructor that takes another Subscriber and gives the option to share the underlying SubscriptionList. Certainly, this could help reduce the allocation count, but most operators that use resources themselves can't share the same underlying SubscriptionList as this would allow them to unsubscribe resources downstream (see pitfall #2). Thus, the current Subscriber structure is more of a burden, performance wise, than a win for operator writers.
You may now think, what's wrong with giving convenience tools to operator writers? I agree that operators implemented outside RxJava should get as much help as reasonable, however, I believe internal operators should take the diligence and have efficient implementations to begin with.
I've tried a few times to resolve this problem but given the architecture of 1.x, I have doubts it can be achieved. Fortunately, the Reactive-Streams' architecture and thus RxJava 2.x solves this problem by making the resource management the responsibility of the operators.
Subscriber request
If you look into how Subscriber is implemented, you'll see the protected final request() method. This is a convenience method that makes sure if there is a Producer set via setProducer, the request is forwarded to it or accumulated until one Producer arrives. Basically, this is an inlined producer-arbiter.One might think the method's implementation gives significant overhead to request management, but JMH benchmarks confirmed they don't really affect the overhead outside a small +/- 3% difference, that may also be due to noise.
The real problem with this method is that it has the same name as Producer.request, making it impossible to implement Producer when one extends Subscriber at the same time.
This has the unfortunate consequence that one usually needs an extra Producer object along with the main Subscriber if the operator does some request-manipulation.
This has the consequence of extra allocation during subscription time which affects GC the most with short-lived sequences. The second property is that it increases the call-stack depth and may prevent some JIT optimization.
Since Subscriber.request() is also part of the public API, it can't be renamed in 1.x to make room for Producer.request().
Again, the solution will come with 2.x: there, since Reactive-Streams Subscriber and Subscription are both interfaces, both can appear the same time, plus, a convenience request() method can be moved into a convenience implementation of Subscriber (i.e., AsyncSubscriber) without affecting the operator internals. (This also means it will be discouraged to use convenience Subscribers within operators.)
Lift
Along with backpressure, the method Observable.lift() is considered by many as the best addition to the library. It lets you step into the subscription process and given a Subscriber from downstream, you can return another Subscriber for upstream that does the business logic for that operator.It became so popular almost all instance operators of Observable are now using it.
Unfortunately, the convenience has a cost: allocation. For most operators, applying that operator to a sequence incurs 3 object allocations. To show this, let's unroll the application of the map() operator:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { OperatorMap<T, R> op = new OperatorMap<T, R>(func); return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> child) { Subscriber<? super T> parent = op.call(child); Observable.this.unsafeSubscribe(parent); } }); }
We have 1) the Operator instance, 2) the Observable instance and 3) the OnSubscribe instance for each application.
This may not of concern for direct sequences that use map, but imagine you have this 3 allocation a million times because you happen to flatMap something whose inner Observables have operators applied to them:
Observable.range(1, 1_000_000).flatMap(v -> Observable.just(v).observeOn(Schedulers.computation()).map(v -> v * v)) .subscribe(...);
The lift operator is practically an OnSubscribe instance that captures the upstream Observable and calls Operator.call with the downstream Subscriber. Clearly, one could just implement operators directly with OnSubscribe and have the upstream Observable as a parameter; the total instance sizes wouldn't change much but both the allocation count and stack depth get reduced.
The current lift structure has another adverse effect: it makes operator-fusion difficult to impossible in its current form because 1) it is an anonymous class and one can't discover its upstream Observable and Operator easily, and 2) even if made a named class, the two classes are hidden behind indirection and any discovery process now faces more overhead.
Luckily, the shortcomings mentioned so far can be remedied without affecting the public API, but requires diligence of writing and reviewing thousands of lines of code changes.
Unfortunately, when I implemented RxJava 2.0 developer-preview last September, I did not think of this overhead thus the current 2.x branch still uses lift() extensively.
However, there is light at the end of the tunnel: Project Reactor 2.5 doesn't go down on the lift() path and now has lower overhead than RxJava.
Create
Lately, I'm quite outspoken against Observable.create() and now I think it should be named something more scary so beginners avoid it and look for proper factory methods in Observable that do backpressure and unsubscription properly. I can see it as a tool for demonstating to one's audience how to enter the reactive world, but I'm convinced it should receive less spotlight in those presentations.
Regardless, the problem with create() is that it encourages creating 2 instances per Observable: 1) the Observable instance itself and the OnSubscribe holding the subscription logic.
The approach that one creates an Observable instance with create() was born from the encouragement: "composition over inheritance". From general design perspective, this sounds okay, but one has to note that in Java, composition means object allocation: outer objects and inner objects, and more inner-inner objects.
To avoid all these allocations, the solution would be to make Observable not hold an instance of OnSubscribe by default (but keep create() as the lambda-factory version) and operators (both source and intermediate) should extend Observable directly. All operator methods would still reside in Observable:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return new ObservableMap<T, R>(this, func); }
Thus, without lift() and create(), map() would allocate a single Observable instance per application.
Such change, I believe, wouldn't affect the public API since Observable methods are static or final to begin with and operators would be still a subclass of Observable. The change also would help with operator-fusion because each upstream source can now be directly identified and its parameters exposed without indirection.
Again, Project Reactor 2.5 is ahead of RxJava and doesn't use the create() mechanics. Its operators are implemented extending a base class, Flux, the way suggested above.
Conclusion
Designing and implementing RxJava version was and is a learning process as well with unanticipated effects on complexity and performance.
You may think, why the hassle about structures and allocations that clearly work in their current form? Two reasons: the Cloud and Android/IoT. For the cloud, where billions of events happen, any inefficiency or unnecessary overhead is amplified along with the numbers. You may not easily calculate how much does that range-flatmap example above cost you on your laptop, but Cloud suppliers will make you pay for each second, gigabyte and gigahertz of using their service. For Android and IoT, the resource constraints of the devices and the expectancy of more and more features requires one - eventually - to budget memory usage, GC and battery life.
Nincsenek megjegyzések:
Megjegyzés küldése