Introduction
In this final part, I'm going to show how one can implement operators (source and transformative alike). Since the Completable API features no values but only the terminal onError and onCompleted events, there are a way less meaningful operators possible unlike the main Observable class; therefore, most of the examples will feature an existing Completable operator.
Empty
Our first operator will be the empty() operator that emits onCompleted once subscribed to. We implement it through the CompletableOnSubscribe functional interface:Completable empty = Completable.create(completableSubscriber -> { BooleanSubscription cancel = new BooleanSubscription(); // (1) completableSubscriber.onSubscribe(cancel); // (2) if (!cancel.isUnsubscribed()) { cancel.unsubscribe(); // (3) completableSubscriber.onCompleted(); // (4) } });
The relief here is that there is no need for type parameters. The Completable API follows the concepts of the Reactive-Streams design where the way of letting the child subscribers cancel their upstream is enabled by calling an onSubscribe method with a Subscription instance.
For the case of empty, it means we have to create some instance of the Subscription interface: a BooleanSubscription that let's us examine if the child has unsubscribed or not (1). Before we can emit any terminal event we have to call onSubscribe and send the child subscriber this BooleanSubscription instance (2). This is a mandatory step and if omitted, we can expect NullPointerException from the various Completable operators at worst or non-functioning unsubscription when converted to Observable at best.
Similar to the Reactive-Streams spec, when a terminal event is emitted, the aforementioned Subscription has to be considered unsubscribed. We achieve this by manually unsubscribe our BooleanSubscription (3) and calling the onCompleted() method.
This may look a bit complicated for such a simple operator. The rules, however, allow for a much simpler version:
Completable empty = Completable.create(completableSubscriber -> { completableSubscriber.onSubscribe(Subscriptions.unsubscribed()); completableSubscriber.onCompleted(); });
Here, we send an already unsubscribed Subscription and call onCompleted immediately after. The reason for this is twofold: unsubscription should be considered best effort meaning that a) one must be prepared events may slip trough; b) there is a really small window between the onSubscribe and onCompleted calls so the isUnsubscribed may return false for most async usages and c) the Subscription should be considered unsubscribed anyway just before onCompleted is called.
Empty delayed
Let's assume we want to emit the onCompleted event after some delay. We are going to need a Scheduler.Worker instance for it but we also have to ensure the delayed task can be cancelled if necessary.public static Completable emptyDelayed( long delay, TimeUnit unit, Scheduler scheduler) { return Completable.create(cs -> { Scheduler.Worker w = scheduler.createWorker(); cs.onSubscribe(w); w.schedule(() -> { try { cs.onCompleted(); } finally { w.unsubscribe(); } }, delay, unit); }); }
Luckily, Scheduler.Worker is a Subscription and thus we can directly send it to the child CompletableSubscriber before scheduling the call to onCompleted. In the scheduled task, we unsubscribe the worker after calling onCompleted at which point it is clear the worker is not unsubscribed. The Reactive-Streams specification states that org.reactivestreams.Subscription should be considered cancelled at that point and it works there because there is no way to check if a Subscription is cancelled or not. However, RxJava lets you check it but it doesn't really make sense to check if your upstream considers you unsubscribed or not. The second reason for the show order is that unsubscribing a Worker may cause unwanted interruptions down the line of onCompleted.
If we really want to make sure the Subscription the child receives is actually unsubscribed, we have to add a level of indirection via a MultipleAssignmentSubscription:
Scheduler.Worker w = scheduler.createWorker(); MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); mas.set(w); cs.onSubscribe(mas); w.schedule(() -> { mas.set(Subscriptions.unsubscribed()); mas.unsubscribe(); try { cs.onCompleted(); } finally { w.unsubscribe(); } }, delay, unit);
Instead of sending the Worker to the child, we wrap it inside the MultipleAssignmentSubscription and just before completing the child, we replace its content with an already unsubscribed Subscription and unsubscribe the whole container. The reason for this replacement, and the use of MultipleAssignmentSubscription is to avoid unsubscribing the worker too early; a SerialSubscription or a CompositeSubscription would not allow this.
Finally, one must be careful with the scheduling of the onCompleted call, especially with RxJava 2.0 Schedulers. 2.0 Schedulers allow direct scheduling, that is, you can schedule a task without the need to create a Worker and unsubscribe it after use. This reduces the overhead for most one-shot scheduled operators but has the (acceptable) property of not ensuring any ordering between scheduled tasks of the same Scheduler. Therefore, if one is inclined to implement the delayed empty with it, it may look like this:
cs.onSubscribe( scheduler.scheduleDirect(cs::onCompleted, delay, unit));
There is, however, a race condition here: it is possible the scheduled action, and the onCompleted within it gets executed before the scheduleDirect returns and thus the child subscriber will not have a Subscription set. A worse scenario is that both onXXX methods may run at the same time which violates the sequential protocol of Completable. The solution, again, is to have some indirection:
MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); cs.onSubscribe(mad); mad.set( scheduler.scheduleDirect(cs::onCompleted, delay, unit));
Remark: due to the naming conflict, 2.0 named its resource-handling interface Disposable instead of Subscription. I'll post an entire series about RxJava 2.0 soon.
This example should foreshadow one property of the Completable API: resource management is the responsibility of the operator itself and there is no convenient add(Subscription) available anymore (such as with rx.Subscriber). This small inconvenience requires you to use subscription containers whenever there is scheduling or multiple sources involved. The benefit of this setup is that if an operator doesn't need resource management, no such structure is created (such as in rx.Subscriber) saving on allocation cost, footprint and gives better performance overall.
First completed
When I was in primary school, sometimes, the teacher would issue a challenge where the first one to complete it would win some small prize. This is a nice analogy to the amb() operator: the first source to complete wins. The first one to fail also wins - unlike in real life were failure is not an option.Regardless, what does it take to write such an operator for Completable? Clearly, unlike Observable.amb, we can't really capture the information which Completable source was the one that terminated first, therefore, there is no need for index trickery and the other things, a simple AtomicBoolean is sufficient.
public static Completable amb(Completable... students) { return Completable.create(principal -> { AtomicBoolean done = new AtomicBoolean(); // (1) CompositeSubscription all = new CompositeSubscription(); // (2) CompletableSubscriber teacher = new CompletableSubscriber() { @Override public void onSubscribe(Subscription s) { all.add(s); // (3) } @Override public void onCompleted() { if (done.compareAndSet(false, true)) { // (4) all.unsubscribe(); principal.onCompleted(); } } @Override public void onError(Throwable e) { if (done.compareAndSet(false, true)) { // (5) all.unsubscribe(); principal.onError(e); } } }; principal.onSubscribe(all); // (6) for (Completable student : students) { if (done.get() || all.isUnsubscribed()) { // (7) return; } student.subscribe(teacher); // (8) } }); }
In the primary school example, there is only one teacher that listens to all students for the completion indicator. This is an interesting optimization in the Completable (and in Reactive-Streams) world and it does technically work: the teacher CompletableSubscriber is essentially stateless: it forwards its onXXX calls to other classes and doesn't have to remember who called its methods. I'm emphasizing technically because at this point in time, the Reactive-Streams specification forbids subscribing the same Subscriber instance to multiple Publishers, which in my opinion is over restrictive: library writers, who know what they are doing should be allowed to do this. Unsurprisingly, the compliant resolution is to move the creation of the teacher CompletableSubscriber into (8) without any changes to its internals; clearly, that should indicate it can be shared among the students. Nonetheless, let's see what each notable point does:
- We create the shared done indicator which is set to true once one of the student notifies the teacher about its completion (or failure).
- In case the head principal doesn't like the challenge, he/she can cancel the entire challenge.
- The teacher will register the Subscriptions given by the student Completables.
- and makes sure the first who signals the terminal event will also notify the principal about it (unfortunately, he/she won't know who was the first actually). At this point, there is no reason for the others to continue and will be unsubscribed from the challenge.
- In addition, if it turns out the challenge melts the brain of one of the students, for safety reasons, the challenge has to be cancelled and the head principal notified about the error.
- We allow the principal to tell each student to stop working on the challenge.
- For each student Completable, we "hand out" the challenge material and subscribe the teacher to the each students terminal event. It is possible that while the challenge gets completed/failed/cancelled while we are still subscribing to students at which point there is no reason to continue the process.
When all completed
Remaining at the school example, other times the students are evaluated and such evaluation procedure happens until all students have completed (or failed) their evaluation. If an accident happens, we may or may not stop the evaluation process and send the injured students to the ambulance in one batch.I hope this operator setup sounds familiar, if not, here is the answer: merge and mergeDelayError respectively, depending on the "assembled" school policy.
Let's see the simplest case where we know exactly the number of students and any failure should stop the evaluation process:
public static Completable merge(Completable... students) { return Completable.create(principal -> { AtomicInteger remaining = new AtomicInteger(students.length); // (1) CompositeSubscription all = new CompositeSubscription(); CompletableSubscriber evaluator = new CompletableSubscriber() { @Override public void onSubscribe(Subscription s) { all.add(s); } @Override public void onCompleted() { if (remaining.decrementAndGet() == 0) { // (2) all.unsubscribe(); principal.onCompleted(); } } @Override public void onError(Throwable e) { if (remaining.getAndSet(0) > 0) { // (3) all.unsubscribe(); principal.onError(e); } } }; principal.onSubscribe(all); for (Completable student : students) { if (all.isUnsubscribed() || remaining.get() <= 0) { // (4) return; } student.subscribe(evaluator); } });
The implementation looks quite similar to the amb() case but has a few differences:
- We need to count (down) atomically the number of students who completed the evaluation successfully.
- Once it reaches zero, the principal is notified about the completion of the entire evaluation.
- If one of the students signals an error, we set the remaining count to zero atomically and if it was previously non-zero, we cancel everybody and signal the error to the principal. Note that this can happen at most once because if there are multiple concurrent onError calls, only one of them will successfully replace a positive remaining value with a zero value. Any subsequent regular completion will just further decrement the remaining value.
- If there was an error or cancellation, the loop that subscribes the evaluator to the students should be stopped to not waste more time and resources.
But what if we don't know the number of students and we don't really want to stop the evaluation in case of an error. This complicates the error management and tracking of completed students slightly:
public static Completable mergeDelayError( Iterable<? extends Completable> students) { return Completable.create(principal -> { AtomicInteger wip = new AtomicInteger(1); // (1) CompositeSubscription all = new CompositeSubscription(); Queue<Throwable> errors = new ConcurrentLinkedQueue<>(); // (2) CompletableSubscriber evaluator = new CompletableSubscriber() { @Override public void onSubscribe(Subscription s) { all.add(s); } @Override public void onCompleted() { if (wip.decrementAndGet() == 0) { // (3) if (errors.isEmpty()) { principal.onCompleted(); } else { principal.onError( new CompositeException(errors)); } } } @Override public void onError(Throwable e) { errors.offer(e); // (4) onCompleted(); } }; principal.onSubscribe(all); for (Completable student : students) { if (all.isUnsubscribed()) { // (5) return; } wip.getAndIncrement(); // (6) student.subscribe(evaluator); } evaluator.onCompleted(); // (7) }); }
Again, the structure looks similar, but the expected behavior requires different algorithms:
- We start with an AtomicInteger work-in-progress counter and from number 1. The reason for this is that we don't know how many students are going to get from the Iterable, but we know we have finished once the wip counter reaches zero after all students and the CompletableOnSubscribe finished.
- We will collect the exceptions into a concurrent queue.
- The terminal condition is determined in the evaluator's onCompleted method: if the wip counter reaches zero, we check if there were errors queued up along the way and if so, we emit it as a CompositeException. Otherwise, a regular onCompleted event is emitted to the principal.
- Since we don't stop on error, we have to perform the same wip decrement as in onCompleted, but before that, the error has to be queued up. (Note that misbehaving Completable sources can disrupt this and trigger early completion if they send multiple terminal events.)
- In the loop where the student Completables are subscribed to we can only check if the principal is no longer interested in the evaluation; the value of the wip counter doesn't help here because it is going to be at least 1 while the loop is running.
- For each new Completable student, first we increment the wip count and then subscribe the evaluator to it. This makes sure the wip counter is at least 1 so the terminal condition isn't met while the loop is running.
- Finally, we call the evaluator's onCompleted method to signal no more Completables students will appear. This now allows the wip counter to reach zero and terminate the whole process.
Such compactness and reuse is rare (or even impossible) with the regular RxJava 1.x Observable operators.
Transformative operators
I don't think there are too many ways one can transform a Completable "sequence". Most Observable operators no longer make sense and thus are omitted from the Completable API. Regardless, let's see a few examples.
With the first operator, we'd like to suppress an exception and since there are no values involved, the best we can do is to signal an onCompleted ourselves:
CompletableOperator onErrorComplete = cs -> { return new CompletableSubscriber() { @Override public void onSubscribe(Subscription s) { cs.onSubscribe(s); } @Override public void onCompleted() { cs.onCompleted(); } @Override public void onError(Throwable e) { cs.onCompleted(); } }; }; source.lift(onErrorComplete).subscribe();
Alternatively, we'd like to resume with another Completable in case of an error:
public static Completable onErrorResumeNext( Completable first, Func1<Throwable, ? extends Completable> otherFactory) { return first.lift(cs -> new CompletableSubscriber() { final SerialSubscription serial = new SerialSubscription(); // (1) boolean once; @Override public void onSubscribe(Subscription s) { serial.set(s); // (2) } @Override public void onCompleted() { cs.onCompleted(); } @Override public void onError(Throwable e) { if (!once) { // (3) once = true; otherFactory.call(e).subscribe(this); // (4) } else { cs.onError(e); // (5) } } }); }
In this example, we lift the CompletableOperator instance into the supplied Completable. In the operator body, we return a CompletableSubscriber with the following internal behavior:
- Since we may need to switch sources, we have to swap the incoming Subscription from the first to the other Completable. It is possible to use MultipleAssignmentSubscription for this case as well. This is analogous to the ProducerArbiter approach common with Observable operators, although much simpler in nature. In addition, we will reuse the this instance on the new Completable but we don't want to keep resubscribing to it if it fails as well.
- We set the incoming Subscription on the SerialSubscription, evicting the previous subscription.
- In case the first signals an error, we make sure the case that switches to the other Completable runs once,
- because we are going to reuse the current CompletableSubscriber instance for it as well, saving on allocation. The code in (2) makes sure the unsubscription chain is still maintained. This example omits the try-catch around the factory call for brevity; in that case, you can create a CompositeException for both the original error and the fresh crash and call cs.onError with it.
- If this is the other Completable source that fails, we simply signal the same error downstream.
If we think about it, implementing a regular continuation (i.e., andThen, endWith, concatWith) in case of an onCompleted practically uses the same approach. Instead of switching in onError, the switch happens in onCompleted:
// ... @Override public void onCompleted() { if (!once) { // (3) once = true; otherFactory.call(e).subscribe(this); // (4) } else { cs.onCompleted(); // (5) } } @Override public void onError(Throwable e) { cs.onError(e); }
Lastly, we'd like an operator that switches to another Completable on a timeout condition:
public static Completable timeout( Completable first, long timeout, TimeUnit unit, Scheduler scheduler Completable other) { return first.lift(cs -> new CompletableSubscriber() { final CompositeSubscription csub = new CompositeSubscription(); // (1) final AtomicBoolean once = new AtomicBoolean(); // (2) @Override public void onSubscribe(Subscription s) { csub.add(s); // (3) Scheduler.Worker w = scheduler.createWorker(); csub.add(w); cs.onSubscribe(csub); w.schedule(this::onTimeout, timeout, unit); // (4) } @Override public void onCompleted() { if (once.compareAndSet(false, true)) { csub.unsubscribe(); // (5) cs.onCompleted(); } } @Override public void onError(Throwable e) { if (once.compareAndSet(false, true)) { csub.unsubscribe(); cs.onError(e); } } void onTimeout() { if (once.compareAndSet(false, true)) { // (6) csub.clear(); other.subscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription s) { csub.add(s); } @Override public void onCompleted() { cs.onCompleted(); } @Override public void onError(Throwable e) { cs.onError(e); } }); } } }); }
This operator is a bit more involved:
- We are going to track both the first and other Completable source's Subscription as well as the Worker of the Scheduler that triggers the timeout condition.
- Since the terminal event of the first Completable races with timeout event, we need to determine a winner (as with amb()) which locks out the other event.
- When the Subscription arrives from the first source, we add it to the composite, create the Worker and then forward the whole composite to the downstream CompletableSubscriber.
- Finally, we schedule the execution of the onTimeout method. This setup avoids the race between the onTimeout and onSubscribe, i.e., if the timeout were scheduled in assembly time, it may happen before the Subscription arrives at which point one needs extra logic to make thinks right (not detailed here). Most of the time, this style of onSubscribe implementation saves a lot of headache with Reactive-Streams compliant operators in 2.0.
- If the terminal event of the first Completable happens first, we atomically and conditionally set the flag to true (this will prevent the onTimeout to execute its inner logic). Since the composite manages the Worker resource, we have to unsubscribe it, followed by the emission of the original event to downstreams.
- If the onTimeout happens first and wins the race to set the flag, we clear the composite and subscribe to the other Completable with a fresh CompletableSubscriber. We call clear here because we still need the CompositeSubscription to store the Subscription of the other Completable and an unsubscribed composite is of no use. The additional benefit of clear is that it will cancel the first Completable as well as the Worker running the timeout.
Hot Completable?
As a final thought, I'm not sure if a hot Completable (or a published Completable) has any use cases out there, but let's see how one can implement one if necessary. Since Completable doesn't have any value, there is only one meaningful CompletableSubject implementation possible: there is nothing to replay other than the terminal value.
First, let's see the skeleton of the CompletableSubject:
public final class CompletableSubject extends Completable implements CompletableSubscriber { public static CompletableSubject create() { State state = new State(); return new CompletableSubject(state); } static final class State implements CompletableOnSubscribe, CompletableSubscriber { // TODO state fields boolean add(CompletableSubscriber t) { // TODO implement } void remove(CompletableSubscriber t) { // TODO implement } @Override public void call(CompletableSubscriber t) { // TODO implement } @Override public void onSubscribe(Subscription d) { // TODO implement } @Override public void onCompleted() { // TODO implement } @Override public void onError(Throwable e) { // TODO implement } } static final class CompletableSubscription extends AtomicBoolean implements Subscription { /** */ private static final long serialVersionUID = -3940816402954220866L; final CompletableSubscriber actual; final State state; public CompletableSubscription( CompletableSubscriber actual, State state) { this.actual = actual; this.state = state; } @Override public boolean isUnsubscribed() { return get(); } @Override public void unsubscribe() { if (compareAndSet(false, true)) { state.remove(this); } } } final State state; private CompletableSubject(State state) { super(state); this.state = state; } @Override public void onSubscribe(Subscription d) { state.onSubscribe(d); } @Override public void onCompleted() { state.onCompleted(); } @Override public void onError(Throwable e) { state.onError(e); } }
The structure here starts out as with any other Subject before. We have to create the CompletableSubject instance with a factory and have the CompletableSubscriber methods delegate to the shared State instance. The CompletableSubscription will be used for tracking each CompletableSubscriber and help manage the unsubscription based on their identity.
The State class will hold onto the terminal indicator and the optional Throwable instance along with the array of known child CompletableSubscribers:
The State class will hold onto the terminal indicator and the optional Throwable instance along with the array of known child CompletableSubscribers:
Throwable error; volatile CompletableSubscription[] subscribers = EMPTY; static final CompletableSubscription[] EMPTY = new CompletableSubscription[0]; static final CompletableSubscription[] TERMINATED = new CompletableSubscription[0]; boolean add(CompletableSubscription t) { if (subscribers == TERMINATED) { return false; } synchronized (this) { CompletableSubscription[] a = subscribers; if (a == TERMINATED) { return false; } CompletableSubscription[] b = new CompletableSubscription[a.length + 1]; System.arraycopy(a, 0, b, 0, a.length); b[a.length] = t; subscribers = b; return true; } } void remove(CompletableSubscription t) { CompletableSubscription[] a = subscribers; if (a == EMPTY || a == TERMINATED) { return; } synchronized (this) { a = subscribers; if (a == EMPTY || a == TERMINATED) { return; } int j = -1; for (int i = 0; i < a.length; i++) { if (a[i] == t) { j = i; break; } } if (j < 0) { return; } if (a.length == 1) { subscribers = EMPTY; return; } CompletableSubscription[] b = new CompletableSubscription[a.length - 1]; System.arraycopy(a, 0, b, 0, j); System.arraycopy(a, j + 1, b, j, a.length - j - 1); subscribers = b; } }
The add and remove methods have the usual and familiar implementations that allows tracking the subscribed child CompletableSubscribers.
Next, let's handle the incoming subscribers:
@Override public void call(CompletableSubscriber t) { CompletableSubscription cs = new CompletableSubscription(t, this); t.onSubscribe(cs); if (add(cs)) { if (cs.isUnsubscribed()) { remove(cs); } } else { Throwable e = error; if (e != null) { t.onError(e); } else { t.onCompleted(); } } }
We create a CompletableSubscription for each CompletableSubscriber that captures bot the subscriber and the current state instance: if the child subscriber calls unsubscribe on it, it can remove itself from the tracking array of the state. Note that there is a race condition between a successful add and a cancellation coming from downstream which may leave the CompletableSubscriber attached. Therefore, we have to check if the child has unsubscribed during the run of add and if so, we call remove() again to be sure. If the add returns false, that means the CompletableSubject has reached its terminal state and reading the error field can tell how the child subscriber should be notified.
Handling the onXXX notifications isn't that complicated either:
@Override public void onSubscribe(Subscription d) { if (subscribers == TERMINATED) { d.unsubscribe(); } } @Override public void onCompleted() { CompletableSubscription[] a; synchronized (this) { a = subscribers; subscribers = TERMINATED; } for (CompletableSubscription cs : a) { cs.actual.onCompleted(); } } @Override public void onError(Throwable e) { CompletableSubscription[] a; synchronized (this) { a = subscribers; error = e; subscribers = TERMINATED; } for (CompletableSubscription cs : a) { cs.actual.onError(e); } }
Reacting to onSubscribe is up for debate; here I unsubscribe the incoming subscription if the CompletableSubject reached its terminal state. Otherwise, we can't do much since the CompletableSubject could be subscribed to many Completable sources of which any can bring it to its terminal state. You can ignore the parameter this method or keep track all of the Subscriptions in a composite.
The onError and onCompleted methods look quite alike. In both cases we atomically swap in the terminated array and loop through the previous array while emitting the appropriate terminal event. Note that we set the error field in onError before we store the TERMINATED value in subscribers, which will give it proper visibility in the call() method above.
Conclusion
In this post, I've detailed ways of implementing Completable operators that either act as sources of terminal events or transform them in some way and even thrown in a Subject-like implementation of it.Completable is more like a type-tool to indicate a sequence won't have any value but only side-effects and the reduced API surface may be more convenient than working with the full-blowin Observable API.
Implementing Completable operators is easier than implementing the backpressure-supporting Observable operators, but one has to look out for proper unsubscription chaining, avoiding races between the onXXX methods and utilizing the AtomicXXX classes for the (efficient) state management.
Now that we have even more experience in writing Subjects, the next blog post will conclude the series about ConnectableObservables.