Introduction
I have the feeling many would like to bury Subjects and I'm yet going to do a multi-post series about them.
Some consider them the mutable state of the reactive world, which I don't disagree with, but then they continue and say in big letters: don't use them while showing some more examples of using Observable.create():
Observable.create(s -> { int i = 0; while (true) { s.onNext(i++); } }).subscribe(System.out::println);
What about unsubscription and backpressure? No amount of operators can fix that for this source, yet you can apply onBackpressureXXX strategies on a Subject at any time.
Subjects are by no means bad or broken, but as with other components of the reactive paradigm, one must learn when and how to use them. For those who are lecturing about them, they should reconsider when and how they introduce Subjects to their audience. I suggest introducing them after introducing regular fluent operators but before talking about create().
In this series, I'm going to try and introduce Subjects, detail their requirements and structure and show how one can build its own custom Subject.
Imperative eventing
Let's assume you'd like to emit your own events, but you can't be sure when those events will be fired or how many of them will be there. Clearly, just() and from() can't help here and you don't want to create() an Observable which spins on some state either.The best would be an object that is both Observable, so clients can chain onto it, and Observer so you can emit values and terminal events as well. The combination of these two is what's called Subject (or Processor in Reactive-Streams naming).
You can think of them as a way of multicasting your values to interested parties without the hassle to handle thread-safety for the incoming Subscribers.
Subject changeEvents = ... changeEvents.subscribe(System.out::println); changeEvents.subscribe(System.out::println); changeEvents.onNext("Added"); Thread.sleep(10); changeEvents.onNext("Removed"); Thread.sleep(10); changeEvents.onCompleted();
Subjects are generified and sometimes you want them to emit the same type as received or emit a completely different type. In C# you can define two kinds of it with different arities:
interface ISubject<T>: IObserver<T>, IObservable<T> { } interface ISubject<T, U>: IObserver<T> IObservable<U> { }
however, Java's type erasure doesn't let you do this, therefore, RxJava went with the two-parameter version of it:
abstract class Subject<T, R> extends Observable<R> implements Observer<T> { }
Since RxJava's fluent API entry point is a class itself, in order to retain the composition possibility, Subject has to extend Observable and implement Observer. Perhaps a more interesting option would be to extend Subscriber instead of Observer to gain some resource-management and backpressure options, but Java doesn't let you extend more than one base class either and having the composable methods available is more important.
So in the example above, you'd have:
Subject<String, String> changeEvents = ...
Subjects are also considered hot observables: they 'operate' even if you don't have a Subscriber listening into them, similar to how a radio station doesn't stop playing music if you turn off your radio; who is there is there, who isn't there isn't.
For the example, such a Subject in RxJava is called a PublishSubject:
Subject<String, String> changeEvents = PublishSubject.create();
Certainly, having a create() factory method helps to avoid the repetition of the type parameters, but why can't one just call new PublishSubject<>() like in Rx.NET?
The reason is how the fluent API is made possible by an Observable class. If you recall, you have to provide an OnSubscribe instance when you create an Observable via its create() method which internally will remember that instance to call call() on it whenever a Subscriber subscribes through subscribe().
Unlike cold Observables, Subjects have to track their clients (so everyone gets the same data) which tracking has to happen both inside the OnSubscribe and in the Subject body itself. Unfortunately, Java doesn't allow an inner class inside the constructor to access the parent class' fields, therefore, the shared state between the two has to be extracted into a separate class and presented to both of them. This is non-trivial therefore the whole Subject instantiation and setup process is hidden behind such factory methods like PublishSubject.create(). (Once we get to the Subject-construction, I'll explain this with the help of examples.)
Flavors
Sometimes, you don't just want to dispatch your events as you see fit and don't care about your subscriber audience.For example, you are a TV network and just throwing out a large amount of TV series and their episodes on a weekly basis. However, your clients can't keep up with the sheer amount but they don't want to skip any of the episodes either. Therefore, their smart TV or the cable provider itself offers the capability to cache these episodes, starting at some point, and allow the subscriber to watch it in sequence in its own pace and without leaving out any.
In the programming world, you'd want to emit events and allow clients to run at a different pace but still receive all of your events, perhaps even after you stopped generating those events: a late Subscriber comes in and you'd like to replay all events that has been accumulated during the active times.
This is called a ReplaySubject.
By default, you can create an unbounded ReplaySubject which will cache everything it receives and replays it to Subscribers, including any terminal events.
However, some use cases require limiting the retention time or amount of the cached elements so later Subscribers don't get everything from the very beginning. The RxJava API offers three additional replay modes:
- createWithSize(n) will retain at most n elements,
- createWithTime(t, u) will retain elements that are younger that than t and
- createWithTimeAndSize(n, t, u) will retain at most n elements which are also younger than t.
This could be enough, but there exist further cases that warrant for an unique Subject implementation.
For example, if one performs an asynchronous computation and wants to emit just a single value followed by a completion event. ReplaySubject does work but is too verbose and the overhead might be inacceptable for such a simple task. Therefore, RxJava offers another Subject called AsyncSubject. It remembers the very last element it received and once onCompleted is called, all currently listening and future listeners will receive just that single value followed by the completion event. But unlike ReplaySubject, if one calls onError on an AsyncSubject, any previously received value is ignored and all Subscribers will just receive the Throwable from then on.
The final case that is covered in RxJava via a Subject variant is when one would like to have a single value stored. Subscribers to it should immediately receive this value and any subsequent value in case a new value is onNext'd on the Subject. It is called BehaviorSubject and some also call it the reactive property. Again, a ReplaySubject of size 1 would also do, but unlike ReplaySubject, sending an onCompleted() will evict the single stored value and subsequent Subscribers will only receive just the onCompleted (or onError) event and not any value. You can create a BehaviorSubject with or without an initial value.
A reactive list
After such dry text, let's see how we can put these subjects into use. Imagine you want to build a list class that sends out notifications whenever its contents change. We want to send out events in case of addition, removal or update to some element and have- a notification channel for the type of change,
- a notification channel of the value causing the change, which also remembers the last value that was involved in a change and
- a notification channel that holds onto the last 10 elements added to the list.
I'll call this class ReactiveList:
public final class ReactiveList<T> { public enum ChangeType { ADD, REMOVE, UPDATE // (1) }; final List<T> list = new ArrayList<>(); // (2) final PublishSubject<Changetype> changes = PublishSubject.create(); // (3) final BehaviorSubject<T> changeValues = BehaviorSubject.create(); final ReplaySubject<T> latestAdded = ReplaySubject.createWithSize(10); public Observable<ChangeType> changes() { // (4) return changes; } public Observable<T> changeValues() { return changeValues; } public Observable<T> latestAdded() { return latestAdded; } public void add(T value) { // implement } public void remove(T value) { // implement } public void replace(T value, T newValue) { // implement } }
Our ReactiveList consist of the following notable elements:
- We define an enum for the three kinds of change events: add, remove and update.
- The data will be stored in a regular j.u.List instance.
- We specify the Subjects for the various event outputs and
- we specify methods that return an Observable of those outputs.
// ... public void add(T value) { list.add(value); changes.onNext(ChangeType.ADD); changeValues.onNext(value); latestAdded.onNext(value); } public void remove(T value) { if (list.remove(value)) { changes.onNext(ChangeType.REMOVE); changeValues.onNext(value); } } public void replace(T value, T newValue) { int index = list.indexOf(value); if (index >= 0) { list.set(index, newValue); changes.onNext(ChangeType.UPDATE); changeValues.onNext(newValue); } } }
Simply put, we perform the relevant changes and call onNext on the subjects to propagate the relevant change information.
A more reactive list
What if we want the inputs to the ReactiveList to be reactive as well by providing an Observer surface for them instead?For simplicity, I'll drop the replace() functionality but add a new method list() that will return a snapshot Observable of the contents of the list at the call time.
Now that we go for full reactivity, one has to consider concurrent calls to the Subjects. Subjects implement the Observer interface and thus inherit the requirement that onXXX methods have to be called in a sequential fashion and not concurrently. We can, of course use what we learned from serialized access, but such behavior is so often required the Subject has a dedicated operator for it, toSerialized() which returns a Subject that can be accessed fully concurrently.
public class MoreReactiveList<T> { public enum ChangeType { ADD, REMOVE }; final List<T> list = new ArrayList<>(); final Subject<ChangeType, ChangeType> changes; // (1) final Subject<T, T> changeValues; final Observer<T> addObserver; // (2) final Observer<T> removeObserver; public Observable<ChangeType> changes() { return changes; } public Observable<T> changeValues() { return changeValues; } public Observable<T> list() { // (3) List<T> copy = new ArrayList<>(); synchronized (list) { copy.addAll(list); } return Observable.from(copy); } public Observer<T> adder() { // (4) return addObserver; } public Observer<T> remover() { return removeObserver; } void onAdd(T value) { // (5) synchronized (list) { list.add(value); } changes.onNext(ChangeType.ADD); changeValues.onNext(value); } void onRemove(T value) { synchronized (list) { if (!list.remove(value)) { return; } } changes.onNext(ChangeType.REMOVE); changeValues.onNext(value); } void clear() { synchronized (list) { list.clear(); } } public MoreReactiveList() { // implement } }
The data types and structure has changed a little:
- Since we are going to serialize the Subjects, we lose their concrete type and have to specify them as Subject<T, R>.
- We will have a single set Observers for both add- and remove-channels.
- Here we have the new a list() method that creates a snapshot of the underlying list and returns an Observable of it. Note the synchronization around the list itself: the class will be manipulated concurrently and we need to ensure thread safety.
- We return the final Observer instances for the add- and remove-channels through the methods.
- The actual list manipulation has been moved to the package-private onAdd and onRemove methods.
Now let's see the missing MoreReactiveList() constructor's logic:
// ... public MoreReactiveList() { changes = PublishSubject.<ChangeType>create() .toSerialized(); // (1) changeValues = BehaviorSubject.<T>create() .toSerialized(); addObserver = new SerializedObserver<>( // (2) Observers.create( this::onAdd, t -> { clear(); changes.onError(t); changeValues.onError(t); }, () -> { clear(); changes.onCompleted(); changeValues.onCompleted(); } )); removeObserver = new SerializedObserver<>( // (3) Observers.create( this::onRemove, t -> { clear(); changes.onError(t); changeValues.onError(t); }, () -> { clear(); changes.onCompleted(); changeValues.onCompleted(); } )); } }
It works as follows:
- We setup the output channels similar to ReactiveList but this time, we serialize the access to them.
- The addObserver is a serialized observer that forwards its onNext method to the onAdd method. If it receives a terminal event, the list is cleared and the event is propagated to both output channels.
- The removeObserver is built up similarly to (2) with the exception that it forwards to onRemove.
Let's try it out:
MoreReactiveList<Long> list = new MoreReactiveList<>(); Observable.timer(0, 1, TimeUnit.SECONDS) .take(10) .subscribe(list.adder()); Observable.timer(4, 1, TimeUnit.SECONDS) .take(10) .subscribe(list.remover()); list.changes().subscribe(System.out::println); list.changes() .flatMap(e -> list.list().toList()) .subscribe(System.out::println); list.changeValues.toBlocking().forEach(System.out::println);
In the example, we create a list and two timers which run 4 seconds apart and emit items every second. We use the first timer to add to the list and the second to remove from the list. We then observe the change types, we print the current state of the list at when the contents change and we blockingly observe the change-causing values themselves until completion.
Conclusion
In this blog post, I've introduced the Subjects and their RxJava flavors, then shown two reactive objects that make use of their capabilities.In the next post, I'll explain the requirements, structures and algorithms that are necessary to implement a custom Subject.
You say, I quote, "What about unsubscription and backpressure? No amount of operators can fix that for this source, yet you can apply onBackpressureXXX strategies on a Subject at any time."
VálaszTörlésI can see that for a sequence of events triggered at once. But what about a single callback? Is using an inline Subscriber ok for wrapping a simple callback returning a single value or an error? Cause this is a widely used practice on Rx-all-the-things libraries.
Do you know of any guide on how one could wrap that kind of API with Observable.create() by correctly handling back pressure and all the stuff?
Thank you!
For single valued sources, use fromCallable, which also let's you throw a checked exception.
VálaszTörlésAs for the guide, I suggest you read my blog from the very beginning.
VálaszTörlésfromCallable() suppose you have a synchronous method to call, what If the only thing I have is an async method taking a callback?
VálaszTörléssomeSDK.doStuffInBackground(callback);
and this is not available:
someSDK.doStuffNow();
I plan to read all your blog but you write a lot (thank you by the way) so I'm not there quite yet, trying to read the things I need the most right now first. If you could point out to your article where you talk about creating Observables I'd really appreciate it (I couldn't find that reading by a quick look it seems like you talk mostly of advanced stuff like operators and schedulers and I don't plan to create any of them in the immediate future but I'll have to wrap existing API and thus creating Observables).
Thanks again
Right. RxJava doesn't have support for those directly and you have to wrap such callbacks manually:
VálaszTörlésOnSubscribe o = s -> {
AutoCloseable c = sdk.doInBackground(
event -> s.setProducer(new SingleProducer<>(event.getData()));
s.add(Subscriptions.create(() -> { try { c.close(); } catch (Exception ex) { } }));
};
As for the entry point, unless you already know about backpressure and tooling, you can't just create an Observable that conforms to the RxJava protocol.
Thank you David, I elaborated from your input and tried to build something generic to wrap those kind of APIs.
VálaszTörlésI created a gist:
https://gist.github.com/danielesegato/c9861bc813982f248f2244ed2563ef89
would love to have your feedback on it. Does it handle Backpressure correctly?
Hi David, you said, "What about unsubscription and backpressure? No amount of operators can fix that for this source, yet you can apply onBackpressureXXX strategies on a Subject at any time.", couldn't we apply `onBackpressureXXX` to the Observable created by `Observable.create`?
VálaszTörlésHi David, you said, "What about unsubscription and backpressure? No amount of operators can fix that for this source, yet you can apply onBackpressureXXX strategies on a Subject at any time.", couldn't we apply `onBackpressureXXX` to the Observable created by `Observable.create`?
VálaszTörlésYou can apply onBackpressureDrop or onBackpressureLatest but can't cancel the sequence properly and will run in a tight loop indefinitely.
TörlésHi David, you said, "What about unsubscription and backpressure? No amount of operators can fix that for this source, yet you can apply onBackpressureXXX strategies on a Subject at any time.", couldn't we apply onBackpressureXXX to the Observable created by Observable.create?
VálaszTörlésThanks for the article Dávid. Is it possible to use .retry with subjects? If so, how do you prevent the subject from emitting errors when retry resubscribes to its observable?
VálaszTörlésNo. When subjects fail, that leaves them in permanent failed state. Any subsequent observer will receive that failure, making a no-arg retry() resubscribe indefinitely. If you know you deal with a subject-driven straight chain, don't use retry, otherwise use bounded retry.
Törlés