Introduction
If you are following events around Android development, or just happen to follow all things reactive, there was a "big" announcement from Google: they've released their reactive programming library targeting Android specifically: Agera. Of course, one has to look into the details to get an accurate picture.
"By Google" means a team in Google working on Google Play Movies. Certainly its sounds more amplified to say Google than the full path to the team. I happen to do this as well when someone asks where I work: in a lab at the Hungarian Academy of Sciences instead of at the Engineering and Management Intelligence Research Laboratory at the Institute for Computer Science and Control of the Hungarian Academy of Sciences. (Plus, you don't get tired and lost while I'm emitting these words :)
It doesn't really matter who released it, all that matters what they released and how it relates to the well established reactive libraries, RxJava, Reactor and Akka-Streams, altogether.
The Core API
The Agera library is built around the valueless Observer pattern: Observables take Updatables and signal change via update() calls. It is then the responsibility of those Updatables to figure out what changed. This is practically a zero argument reactive dataflow which relies on side-effects per update().interface Updatable { void update(); } interface Observable { void addUpdatable(Updatable u); void removeUpdatable(Updatable u); }
They look innocent and reactive, right? Unfortunately, they've run into the issue with the original java.util.Observable and the other addListener/removeListener based reactive APIs (which I categorized as 0th generation).
Agera Observable
The problem with this pair of methods is that every Observable who adds behavior over an incoming Updatable has to remember the original Updatable in some whay for the case when the same Updatable is removed:
public final class DoOnUpdate implements Observable { final Observable source; final Runnable action; final ConcurrentHashMap<Updatable, DoOnUpdatable> map; public DoOnUpdate(Observable source, Runnable action) { this.source = source; this.action = action; this.map = new ConcurrentHashMap<>(); } @Override public void addUpdatable(Updatable u) { DoOnUpdatable wrapper = new DoOnUpdatable(u, action); if (map.putIfAbsent(u, wrapper) != null) { throw new IllegalStateException("Updatable already registered"); } source.addUpdatable(wrapper); } public void removeUpdatable(Updatable u) { DoOnUpdatable wrapper = map.remove(u); if (wrapper == null) { throw new IllegalStateException("Updatable already removed"); } source.removeUpdatable(wrapper); } static final class DoOnUpdatable { final Updatable actual; final Runnable run; public DoOnUpdatable(Updatable actual, Runnable run) { this.actual = actual; this.run = run; } @Override public void update() { run.run(); actual.update(); } } }
This causes a contention point between independent downstream Updatables at every stage of a pipeline.
True, a similar contention point can be found with RxJava's Subjects and ConnectableObservables, but chained operators after them don't have the contentions. Unfortunately, the Reactive-Streams spec, in its current version, mandates something similar from Publishers. Now RxJava 2.x, Rsc and Reactor completely ignored this, turning out to be over-restrictive in practice, and we are pushing back instead to lighten the spec.
The second problem, although minor, is that you can't add the same Updatable multiple times. First because you can't distinguish between the different "subscriptions" via Map and second the spec mandates throwing an exception. Usually, this rarely happens because most end-consumers are solo.
The third problem is a bigger issue: throwing when the Updatable is no longer registered with the Observable. This creates an unfortunate race condition between end-consumers triggering removal while some intermediate operator such as take also triggers it; one of them will get an exception. This is why modern reactive libraries have idempotent cancellation.
The fourth problem is that in theory, addUpdatable and removeUpdatable can race with each other: some downstream operator would want to disconnect before an upstream operator has actually called addUpdatable. A possible outcome is that the removeUpdate chain throws yet addUpdatable succeeds, causing the signals to flow anyway and causing an unwanted retention of all associated objects.
Agera Updatable
Let's see the API from the consumer's perspective. Updatable is a single method functional interface which makes it easy to attach a listener to an Observable:Observable source = ... source.addUpdatable(() -> System.out.println("Something happened"));
Simple enough, now let's remove our listener:
source.removeUpdatable(() -> System.out.println("Something happened"));
Which yields a nice Exception: the two lambdas are not the same instance/reference. This is a very common problem with addListener/removeListener based APIs. The solution is to store the lambda in a reference and use that when needed:
Updatable u = () -> System.out.println("Something happened"); source.addUpdatable(u); // ... source.removeUpdatable(u);
A small inconvenience indeed, but it gets worse. What if you have many Observables and many Updatables? You have to remember who is registered with who, and keep references to them in some fields. One of the great ideas of the original Rx.NET design was to reduce this necessity to a single reference:
interface Removable extends Closeable { @Override void close(); // remove the necessity of try-catch around close() } public static Removable registerWith(Observable source, Updatable consumer) { source.addUpdatable(consumer); return () -> source.removeUpdatable(consumer); }
Of course, we have to consider idempotence of calling close() here as well:
public static Removable registerWith(Observable source, Updatable consumer) { source.addUpdatable(consumer); final AtomicBoolean once = new AtomicBoolean(); return () -> { if (once.compareAndSet(false, true)) { source.removeUpdatable(consumer); } }); }
Agera MutableRepository
The Agera MutableRepository holds a value and signals update() to registered Updatables if the value changes. This somewhat resembles to the BehaviorSubject we have, with the distinction that the new value doesn't flow to the consumers (remember, update() has no arguments) but has to be get() from the repository:MutableRepositoryrepo = Repositories.mutableRepository(0); repo.addUpdatable(() -> System.out.println("Value: " + repo.get()); new Thread(() -> { repo.accept(1); }).start();
When created via the factory method, it has the interesting property that the observation of the update() happens on the Looper where the repository has been created. (Looper is like a per-thread trampoline scheduler/Executor that let's one execute code on a specific thread, such as the Android main thread).
This out-of-band property creates an interesting case:
Set<Integer> set = new HashSet<>(); MutableRepositoryrepo = Repositories.mutableRepository(0); repo.addUpdatable(() -> set.add(repo.get())); new Thread(() -> { for (int i = 0; i < 100_000; i++) { repo.accept(i); } }).start(); Thread.sleep(20_000); System.out.println(set.size());
Assuming 20 seconds is enough, what is the final size of the Set? One would expect it contains all 100.000 integers. In reality, the value could be anywhere between 1 and 100.000! The reason for this is because the accept() and get() run concurrently and if the consumer is slower, accept() simply overwrites the current value in the repository.
In some cases, this may be acceptable (i.e., similar to when onBackpressureDrop is applied in RxJava), sometimes its not and you may end up spending a lot of time hunting for lost values.
Error handling
Being asynchronous usually means you have asynchronous errors. RxJava and the others compose nicely in this regard: somebody errors out, the whole processing graph is cleaned up automatically unless the programmer wishes otherwise by suppressing, replacing or retrying the flow. The error and cleanup can be very complicated in some cases, but we library developers put in a lot of effort so you don't have to worry about it most of the time.The Agera base API doesn't handle error by itself, you have to do it out-of-band just like with values. If you have multiple services composed via Agera, you have to establish the same error-management "framework" similar to how you'd have to do it in callback-hell situations. Very cumbersome and error-prone by itself due to concurrency and terminal state considerations.
Termination
Again, Agera doesn't have a notion for a completed stream - you have to figure out when that happens on your own. This might not be an issue in GUI cases where your consumer starts with your activity and ends with it as well and signals are delivered continuously. However, asynchronous background Observables now have to somehow tell or specify how many signals they will emit and how will you know the update() signal didn't happen because there is no data available.How to design a modern zero-parameter reactive API
First of all, perhaps you shouldn't bother with one and just use an existing library for this:rx.Observable<Void> signaller = ... rx.Observer<Void> consumer = ... Subscription s = signaller.subscribe(consumer); // ... s.unsubscribe();
You get all the infrastructure, operators and performance from them at basically no additional cost. Better yet, if you generally want to deal with signals of values, you can use the appropriate type instead of Void.
If an existing library feels to cumbersome to learn due to a lot of operators, you can perhaps fork it, delete the unnecessary stuff and use that. Of course, now you have to keep up with bugfixes and performance enhancements.
If forking and pruning doesn't sound attractive, you can develop your own library on top of the Reactive-Streams specification; Publisher<Void>, Subscriber<Void> and all the things between them you need. You get practically free interop with other Reactive-Streams libraries and consumers, plus, you can test your solution via its Test Compatibility Kit (TCK).
Of course, writing a reactive library is hard, writing a reactive library over Reactive-Streams is even harder. As a final resort, you may decide to write a barebone API from scratch.
If you really want to do a zero-argument reactive flow, here are a few tips you should consider:
1) Don't have separate addListener and removeListener. A single entry point simplifies the development of intermediate operators:
interface Observable { Removable register(Updatable u); } interface Removable { void remove(); }
2) Consider injecting the cancellation/remove support instead of returning a cancellation token or remover action:
interface Observable { void register(Updatable u); } interface Updatable { void onRegister(Removable remover); void update(); } // or interface Updatable { void update(Removable remover); }3) Consider adding error signal delivery at least:
Certainly, this complicates the lives of the library writers but can save a lot of on the side of your library's users.
interface Updatable { void onRegister(Removable remover); void update(); void error(Throwable ex); }
4) Consider offer asynchronous boundary as an option in the sequence.
I.e., with the MutableRepository example, you may want to react to the new value on the caller's thread before moving back to the main thread. This means observeOn and perhaps subscribeOn if you intend to have cold sources.
Conclusion
Writing a reactive library is not an easy task and one can fall into a lot of mistakes if one is not familiar with the history and evolution of field. In many companies, the "not invented here" or "we can do better" is so strong they rather start from scratch than learn/build upon somebody else's working solution.(Funny thing, I sometimes offer RxJava for an in-house project and I'm still getting raised eyebrows, even though it practically being "developed here", mostly.)
You may ask, why do I care what Google/Agera does? Aren't I confident in RxJava? Of course I am confident and Agera's existence doesn't really strike me.
However, my experience shows, if you have big name banner over your head, unchallenged self-confidence and sub-par outcome may be forced upon an entire community. I don't really want to give out ideas here but imagine the next Android version would mandate Agera, in its current form, to be the standard for asynchrous programming!
(In addition, interop is inevitable at some point and I really don't want to get complaints on the main RxJava issue list if they down work together properly.)
Let me finish with a wisdom I came up with (as there are now 2 cases to back it up):
You want to write a reactive library? Please don't (just yet)!
I enjoy your articles.
VálaszTörlésHave you looked at http://reactivesocket.io/ ? It looks a promising for building distributed reactive apps, with TCP and Aeron between Java components, plus websocket options to the browser. Would you recommend using http://reactivesocket.io/ ?
I'm aware of ReactiveSocket but haven't looked at the Java code, only the protocol, which is pretty much like Aeron or HTTP/2 again. I had no time or a compelling use case up until now to look into the reactive-io problem on my own.
VálaszTörlésThanks for this comprehensive overview of Agera. I'm trying to understand, are you advocating the use of Agera for developers at all, or are you suggesting to stick to RxJava at the moment? All this stuff can be extremely confusing for beginners, who don't know which library to choose for which projects...
VálaszTörlésStick with RxJava as long as you can. If you are forced to work with Agera, use my RxAgera library to convert it to RxJava.
TörlésThank you very much for this article, very informative.
VálaszTörlésI need to clarify something,
VálaszTörlésis Reactive Stream API that rxjava2 implemented in io.reactivex.Flowable, has something to do with the application protocol that the application utilize?
such as, when http 1.0 server that doesn't support chunked transfer encoding is used by the app, is Flowable still relevant?
what onBackpressureDrop() is did, is it stop calling read() or just close the connection, and reestablished it?
thanks..
RxJava and network IO are on two different levels. You can hold off a server by not consuming its data on the network layer, thus you get byte-stream level backpressure. So stop calling read() and the TCP protocol will ensure you are not flooded. Reopening a connection may require resume support. I wouldn't worry about such details as there is Retrofit for network operations which supports RxJava.
Törlésso, RxJava is just a means to control inter thread stream or in another word just within a single machine?
Törlés