2015. december 14., hétfő

The new Completable API (part 2 - final)

Introduction


In this final part, I'm going to show how one can implement operators (source and transformative alike). Since the Completable API features no values but only the terminal onError and onCompleted events, there are a way less meaningful operators possible unlike the main Observable class; therefore, most of the examples will feature an existing Completable operator.

Empty

Our first operator will be the empty() operator that emits onCompleted once subscribed to. We implement it through the CompletableOnSubscribe functional interface:


Completable empty = Completable.create(completableSubscriber -> {
    BooleanSubscription cancel = new BooleanSubscription();    // (1)
    
    completableSubscriber.onSubscribe(cancel);                 // (2)
    
    if (!cancel.isUnsubscribed()) {
        cancel.unsubscribe();                                  // (3)
        completableSubscriber.onCompleted();                   // (4)
    }
});


The relief here is that there is no need for type parameters. The Completable API follows the concepts of the Reactive-Streams design where the way of letting the child subscribers cancel their upstream is enabled by calling an onSubscribe method with a Subscription instance.

For the case of empty, it means we have to create some instance of the Subscription interface: a BooleanSubscription that let's us examine if the child has unsubscribed or not (1). Before we can emit any terminal event we have to call onSubscribe and send the child subscriber this BooleanSubscription instance (2). This is a mandatory step and if omitted, we can expect NullPointerException from the various Completable operators at worst or non-functioning unsubscription when converted to Observable at best.

Similar to the Reactive-Streams spec, when a terminal event is emitted, the aforementioned Subscription has to be considered unsubscribed. We achieve this by manually unsubscribe our BooleanSubscription (3) and calling the onCompleted() method.

This may look a bit complicated for such a simple operator. The rules, however, allow for a much simpler version:


Completable empty = Completable.create(completableSubscriber -> {
    completableSubscriber.onSubscribe(Subscriptions.unsubscribed());
    completableSubscriber.onCompleted();
});


Here, we send an already unsubscribed Subscription and call onCompleted immediately after. The reason for this is twofold: unsubscription should be considered best effort meaning that a) one must be prepared events may slip trough; b) there is a really small window between the onSubscribe and onCompleted calls so the isUnsubscribed may return false for most async usages and c) the Subscription should be considered unsubscribed anyway just before onCompleted is called.


Empty delayed

Let's assume we want to emit the onCompleted event after some delay. We are going to need a Scheduler.Worker instance for it but we also have to ensure the delayed task can be cancelled if necessary.


public static Completable emptyDelayed(
        long delay, TimeUnit unit, Scheduler scheduler) {
    return Completable.create(cs -> {
        Scheduler.Worker w = scheduler.createWorker();
        
        cs.onSubscribe(w);

        w.schedule(() -> {
            try {
                cs.onCompleted();
            } finally {
                w.unsubscribe();
            }
        }, delay, unit);
    });
}


Luckily, Scheduler.Worker is a Subscription and thus we can directly send it to the child CompletableSubscriber before scheduling the call to onCompleted. In the scheduled task, we unsubscribe the worker after calling onCompleted at which point it is clear the worker is not unsubscribed. The Reactive-Streams specification states that org.reactivestreams.Subscription should be considered cancelled at that point and it works there because there is no way to check if a Subscription is cancelled or not. However, RxJava lets you check it but it doesn't really make sense to check if your upstream considers you unsubscribed or not. The second reason for the show order is that unsubscribing a Worker may cause unwanted interruptions down the line of onCompleted.

If we really want to make sure the Subscription the child receives is actually unsubscribed, we have to add a level of indirection via a MultipleAssignmentSubscription:


Scheduler.Worker w = scheduler.createWorker();

MultipleAssignmentSubscription mas = 
    new MultipleAssignmentSubscription();

mas.set(w);

cs.onSubscribe(mas);

w.schedule(() -> {
    mas.set(Subscriptions.unsubscribed());
    mas.unsubscribe();
 
    try {
        cs.onCompleted();
    } finally {
        w.unsubscribe();
    }
}, delay, unit);


Instead of sending the Worker to the child, we wrap it inside the MultipleAssignmentSubscription and just before completing the child, we replace its content with an already unsubscribed Subscription and unsubscribe the whole container. The reason for this replacement, and the use of MultipleAssignmentSubscription is to avoid unsubscribing the worker too early; a SerialSubscription or a CompositeSubscription would not allow this.

Finally, one must be careful with the scheduling of the onCompleted call, especially with RxJava 2.0 Schedulers. 2.0 Schedulers allow direct scheduling, that is, you can schedule a task without the need to create a Worker and unsubscribe it after use. This reduces the overhead for most one-shot scheduled operators but has the (acceptable) property of not ensuring any ordering between scheduled tasks of the same Scheduler. Therefore, if one is inclined to implement the delayed empty with it, it may look like this:


cs.onSubscribe(
     scheduler.scheduleDirect(cs::onCompleted, delay, unit));


There is, however, a race condition here: it is possible the scheduled action, and the onCompleted within it gets executed before the scheduleDirect returns and thus the child subscriber will not have a Subscription set. A worse scenario is that both onXXX methods may run at the same time which violates the sequential protocol of Completable. The solution, again, is to have some indirection:


MultipleAssignmentDisposable mad = 
    new MultipleAssignmentDisposable();

cs.onSubscribe(mad);

mad.set(
    scheduler.scheduleDirect(cs::onCompleted, delay, unit));


Remark: due to the naming conflict, 2.0 named its resource-handling interface Disposable instead of Subscription. I'll post an entire series about RxJava 2.0 soon.


This example should foreshadow one property of the Completable API: resource management is the responsibility of the operator itself and there is no convenient add(Subscription) available anymore (such as with rx.Subscriber). This small inconvenience requires you to use subscription containers whenever there is scheduling or multiple sources involved. The benefit of this setup is that if an operator doesn't need resource management, no such structure is created (such as in rx.Subscriber) saving on allocation cost, footprint and gives better performance overall.


First completed

When I was in primary school, sometimes, the teacher would issue a challenge where the first one to complete it would win some small prize. This is a nice analogy to the amb() operator: the first source to complete wins. The first one to fail also wins - unlike in real life were failure is not an option.

Regardless, what does it take to write such an operator for Completable? Clearly, unlike Observable.amb, we can't really capture the information which Completable source was the one that terminated first, therefore, there is no need for index trickery and the other things, a simple AtomicBoolean is sufficient.


public static Completable amb(Completable... students) {
    return Completable.create(principal -> {
        AtomicBoolean done = new AtomicBoolean();            // (1)

        CompositeSubscription all = 
             new CompositeSubscription();                    // (2)

        CompletableSubscriber teacher = 
                new CompletableSubscriber() {
            @Override
            public void onSubscribe(Subscription s) {
                all.add(s);                                  // (3)
            }
            
            @Override
            public void onCompleted() {
                if (done.compareAndSet(false, true)) {       // (4)
                    all.unsubscribe();
                    principal.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (done.compareAndSet(false, true)) {       // (5)
                    all.unsubscribe();
                    principal.onError(e);
                }
            }
        };

        principal.onSubscribe(all);                          // (6)

        for (Completable student : students) {
            if (done.get() || all.isUnsubscribed()) {        // (7)
                return;
            }
            student.subscribe(teacher);                      // (8)
        }
    });
}


In the primary school example, there is only one teacher that listens to all students for the completion indicator. This is an interesting optimization in the Completable (and in Reactive-Streams) world and it does technically work: the teacher CompletableSubscriber is essentially stateless: it forwards its onXXX calls to other classes and doesn't have to remember who called its methods. I'm emphasizing technically because at this point in time, the Reactive-Streams specification forbids subscribing the same Subscriber instance to multiple Publishers, which in my opinion is over restrictive: library writers, who know what they are doing should be allowed to do this. Unsurprisingly, the compliant resolution is to move the creation of the teacher CompletableSubscriber into (8) without any changes to its internals; clearly, that should indicate it can be shared among the students. Nonetheless, let's see what each notable point does:

  1. We create the shared done indicator which is set to true once one of the student notifies the teacher about its completion (or failure).
  2. In case the head principal doesn't like the challenge, he/she can cancel the entire challenge.
  3. The teacher will register the Subscriptions given by the student Completables.
  4. and makes sure the first who signals the terminal event will also notify the principal about it (unfortunately, he/she won't know who was the first actually). At this point, there is no reason for the others to continue and will be unsubscribed from the challenge.
  5. In addition, if it turns out the challenge melts the brain of one of the students, for safety reasons, the challenge has to be cancelled and the head principal notified about the error.
  6. We allow the principal to tell each student to stop working on the challenge.
  7. For each student Completable, we "hand out" the challenge material and subscribe the teacher to the each students terminal event. It is possible that while the challenge gets completed/failed/cancelled while we are still subscribing to students at which point there is no reason to continue the process.

When all completed

Remaining at the school example, other times the students are evaluated and such evaluation procedure happens until all students have completed (or failed) their evaluation. If an accident happens, we may or may not stop the evaluation process and send the injured students to the ambulance in one batch.

I hope this operator setup sounds familiar, if not, here is the answer: merge and mergeDelayError respectively, depending on the "assembled" school policy.

Let's see the simplest case where we know exactly the number of students and any failure should stop the evaluation process:


public static Completable merge(Completable... students) {
    return Completable.create(principal -> {
        AtomicInteger remaining = 
            new AtomicInteger(students.length);              // (1)

        CompositeSubscription all = 
            new CompositeSubscription();

        CompletableSubscriber evaluator = 
                 new CompletableSubscriber() {
             @Override
             public void onSubscribe(Subscription s) {
                 all.add(s);
             }
             
             @Override
             public void onCompleted() {
                 if (remaining.decrementAndGet() == 0) {     // (2)
                     all.unsubscribe();
                     principal.onCompleted();
                 }
             }

             @Override
             public void onError(Throwable e) {
                 if (remaining.getAndSet(0) > 0) {           // (3)
                     all.unsubscribe();
                     principal.onError(e);
                 }
             }
        };

        principal.onSubscribe(all);
        
        for (Completable student : students) {
            if (all.isUnsubscribed() 
                || remaining.get() <= 0) {                   // (4)
                return;
            }
            student.subscribe(evaluator);
        }
    });


The implementation looks quite similar to the amb() case but has a few differences:


  1. We need to count (down) atomically the number of students who completed the evaluation successfully. 
  2. Once it reaches zero, the principal is notified about the completion of the entire evaluation.
  3. If one of the students signals an error, we set the remaining count to zero atomically and if it was previously non-zero, we cancel everybody and signal the error to the principal. Note that this can happen at most once because if there are multiple concurrent onError calls, only one of them will successfully replace a positive remaining value with a zero value. Any subsequent regular completion will just further decrement the remaining value.
  4. If there was an error or cancellation, the loop that subscribes the evaluator to the students should be stopped to not waste more time and resources.


But what if we don't know the number of students and we don't really want to stop the evaluation in case of an error. This complicates the error management and tracking of completed students slightly:


public static Completable mergeDelayError(
        Iterable<? extends Completable> students) {
    return Completable.create(principal -> {
        AtomicInteger wip = new AtomicInteger(1);         // (1)

        CompositeSubscription all = 
            new CompositeSubscription();

        Queue<Throwable> errors = 
            new ConcurrentLinkedQueue<>();                // (2)

        CompletableSubscriber evaluator = 
                new CompletableSubscriber() {
            @Override
            public void onSubscribe(Subscription s) {
                all.add(s);
            }

            @Override
            public void onCompleted() {
                if (wip.decrementAndGet() == 0) {         // (3)
                    if (errors.isEmpty()) {
                        principal.onCompleted();
                    } else {
                        principal.onError(
                            new CompositeException(errors));
                    }
                }
            }

            @Override
            public void onError(Throwable e) {
                errors.offer(e);                          // (4)
                onCompleted();
            }
        };

        principal.onSubscribe(all);

        for (Completable student : students) {
            if (all.isUnsubscribed()) {                   // (5)
                return;
            }
            wip.getAndIncrement();                        // (6)
            student.subscribe(evaluator);
        }

        evaluator.onCompleted();                          // (7)
    });
}

Again, the structure looks similar, but the expected behavior requires different algorithms:


  1. We start with an AtomicInteger work-in-progress counter and from number 1. The reason for this is that we don't know how many students are going to get from the Iterable, but we know we have finished once the wip counter reaches zero after all students and the CompletableOnSubscribe finished.
  2. We will collect the exceptions into a concurrent queue.
  3. The terminal condition is determined in the evaluator's onCompleted method: if the wip counter reaches zero, we check if there were errors queued up along the way and if so, we emit it as a CompositeException. Otherwise, a regular onCompleted event is emitted to the principal.
  4. Since we don't stop on error, we have to perform the same wip decrement as in onCompleted, but before that, the error has to be queued up. (Note that misbehaving Completable sources can disrupt this and trigger early completion if they send multiple terminal events.)
  5. In the loop where the student Completables are subscribed to we can only check if the principal is no longer interested in the evaluation; the value of the wip counter doesn't help here because it is going to be at least 1 while the loop is running.
  6. For each new Completable student, first we increment the wip count and then subscribe the evaluator to it. This makes sure the wip counter is at least 1 so the terminal condition isn't met while the loop is running.
  7. Finally, we call the evaluator's onCompleted method to signal no more Completables students will appear. This now allows the wip counter to reach zero and terminate the whole process.
Such compactness and reuse is rare (or even impossible) with the regular RxJava 1.x Observable operators.


Transformative operators

I don't think there are too many ways one can transform a Completable "sequence". Most Observable operators no longer make sense and thus are omitted from the Completable API. Regardless, let's see a few examples.

With the first operator, we'd like to suppress an exception and since there are no values involved, the best we can do is to signal an onCompleted ourselves:


CompletableOperator onErrorComplete = cs -> {
    return new CompletableSubscriber() {
        @Override
        public void onSubscribe(Subscription s) {
            cs.onSubscribe(s);
        }
 
        @Override
        public void onCompleted() {
            cs.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            cs.onCompleted();
        }
    };
};

source.lift(onErrorComplete).subscribe();


Alternatively, we'd like to resume with another Completable in case of an error:


public static Completable onErrorResumeNext(
        Completable first,
        Func1<Throwable, ? extends Completable> otherFactory) {

    return first.lift(cs -> new CompletableSubscriber() {
        final SerialSubscription serial = 
            new SerialSubscription();                          // (1)
        boolean once;

        @Override
        public void onSubscribe(Subscription s) {
            serial.set(s);                                     // (2)
        }

        @Override
        public void onCompleted() {
            cs.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            if (!once) {                                       // (3)
                once = true;
                otherFactory.call(e).subscribe(this);          // (4)
            } else {
                cs.onError(e);                                 // (5)
            }
        }
    });
}

In this example, we lift the CompletableOperator instance into the supplied Completable. In the operator body, we return a CompletableSubscriber with the following internal behavior:

  1. Since we may need to switch sources, we have to swap the incoming Subscription from the first to the other Completable. It is possible to use MultipleAssignmentSubscription for this case as well. This is analogous to the ProducerArbiter approach common with Observable operators, although much simpler in nature. In addition, we will reuse the this instance on the new Completable but we don't want to keep resubscribing to it if it fails as well.
  2. We set the incoming Subscription on the SerialSubscription, evicting the previous subscription.
  3. In case the first signals an error, we make sure the case that switches to the other Completable runs once,
  4. because we are going to reuse the current CompletableSubscriber instance for it as well, saving on allocation. The code in (2) makes sure the unsubscription chain is still maintained. This example omits the try-catch around the factory call for brevity; in that case, you can create a CompositeException for both the original error and the fresh crash and call cs.onError with it.
  5. If this is the other Completable source that fails, we simply signal the same error downstream.

If we think about it, implementing a regular continuation (i.e., andThen, endWith, concatWith) in case of an onCompleted practically uses the same approach. Instead of switching in onError, the switch happens in onCompleted:


        // ...
        @Override
        public void onCompleted() {
            if (!once) {                                        // (3)
                once = true;
                otherFactory.call(e).subscribe(this);           // (4)
            } else {
                cs.onCompleted();                               // (5)
            }
        }

        @Override
        public void onError(Throwable e) {
            cs.onError(e);
        }


Lastly, we'd like an operator that switches to another Completable on a timeout condition:


public static Completable timeout(
        Completable first,
        long timeout, TimeUnit unit, Scheduler scheduler
        Completable other) {
     return first.lift(cs -> new CompletableSubscriber() {

         final CompositeSubscription csub = 
             new CompositeSubscription();                    // (1)

         final AtomicBoolean once = new AtomicBoolean();     // (2)


         @Override
         public void onSubscribe(Subscription s) {
             csub.add(s);                                    // (3)

             Scheduler.Worker w = scheduler.createWorker();
             csub.add(w);

             cs.onSubscribe(csub);

             w.schedule(this::onTimeout, timeout, unit);     // (4)
         }

         @Override
         public void onCompleted() {
             if (once.compareAndSet(false, true)) {
                 csub.unsubscribe();                         // (5)
                 cs.onCompleted();
             }
         }

         @Override
         public void onError(Throwable e) {
             if (once.compareAndSet(false, true)) {
                 csub.unsubscribe();
                 cs.onError(e);
             }
         }

         void onTimeout() {
              if (once.compareAndSet(false, true)) {         // (6)
                  csub.clear();
                  
                  other.subscribe(new CompletableSubscriber() {
                      @Override
                      public void onSubscribe(Subscription s) {
                          csub.add(s);
                      }

                      @Override
                      public void onCompleted() {
                          cs.onCompleted();
                      }

                      @Override
                      public void onError(Throwable e) {
                          cs.onError(e);
                      }
                  });
              }
         }
     });
}


This operator is a bit more involved:

  1. We are going to track both the first and other Completable source's Subscription as well as the Worker of the Scheduler that triggers the timeout condition.
  2. Since the terminal event of the first Completable races with timeout event, we need to determine a winner (as with amb()) which locks out the other event.
  3. When the Subscription arrives from the first source, we add it to the composite, create the Worker and then forward the whole composite to the downstream CompletableSubscriber.
  4.  Finally, we schedule the execution of the onTimeout method. This setup avoids the race between the onTimeout and onSubscribe, i.e., if the timeout were scheduled in assembly time, it may happen before the Subscription arrives at which point one needs extra logic to make thinks right (not detailed here). Most of the time, this style of onSubscribe implementation saves a lot of headache with Reactive-Streams compliant operators in 2.0.
  5. If the terminal event of the first Completable happens first, we atomically and conditionally set the flag to true (this will prevent the onTimeout to execute its inner logic). Since the composite manages the Worker resource, we have to unsubscribe it, followed by the emission of the original event to downstreams.
  6. If the onTimeout happens first and wins the race to set the flag, we clear the composite and subscribe to the other Completable with a fresh CompletableSubscriber. We call clear here because we still need the CompositeSubscription to store the Subscription of the other Completable and an unsubscribed composite is of no use. The additional benefit of clear is that it will cancel the first Completable as well as the Worker running the timeout.


Hot Completable?

As a final thought, I'm not sure if a hot Completable (or a published Completable) has any use cases out there, but let's see how one can implement one if necessary. Since Completable doesn't have any value, there is only one meaningful CompletableSubject implementation possible: there is nothing to replay other than the terminal value.

First, let's see the skeleton of the CompletableSubject:


public final class CompletableSubject 
extends Completable implements CompletableSubscriber {
    
    public static CompletableSubject create() {
        State state = new State();
        return new CompletableSubject(state);
    }

    static final class State 
    implements CompletableOnSubscribe, CompletableSubscriber {

        // TODO state fields

        boolean add(CompletableSubscriber t) {
            // TODO implement
        }

        void remove(CompletableSubscriber t) {
            // TODO implement
        }

        @Override
        public void call(CompletableSubscriber t) {
            // TODO implement
        }
        
        @Override
        public void onSubscribe(Subscription d) {
            // TODO implement
        }
        
        @Override
        public void onCompleted() {
            // TODO implement
            
        }
        
        @Override
        public void onError(Throwable e) {
            // TODO implement
            
        }
    }

    static final class CompletableSubscription 
    extends AtomicBoolean implements Subscription {
        /** */
        private static final long serialVersionUID = 
            -3940816402954220866L;
        
        final CompletableSubscriber actual;
        final State state;
        
        public CompletableSubscription(
                CompletableSubscriber actual, State state) {
            this.actual = actual;
            this.state = state;
        }
        
        @Override
        public boolean isUnsubscribed() {
            return get();
        }
        
        @Override
        public void unsubscribe() {
            if (compareAndSet(false, true)) {
                state.remove(this);
            }
        }
    }
    
    final State state;
    
    private CompletableSubject(State state) {
        super(state);
        this.state = state;
    }
    
    @Override
    public void onSubscribe(Subscription d) {
        state.onSubscribe(d);
    }
    
    @Override
    public void onCompleted() {
        state.onCompleted();
    }
    
    @Override
    public void onError(Throwable e) {
        state.onError(e);
    }
}


The structure here starts out as with any other Subject before. We have to create the CompletableSubject instance with a factory and have the CompletableSubscriber methods delegate to the shared State instance. The CompletableSubscription will be used for tracking each CompletableSubscriber and help manage the unsubscription based on their identity.

The State class will hold onto the terminal indicator and the optional Throwable instance along with the array of known child CompletableSubscribers:

    
    Throwable error;
    
    volatile CompletableSubscription[] subscribers = EMPTY;
    
    static final CompletableSubscription[] EMPTY = 
            new CompletableSubscription[0];
    static final CompletableSubscription[] TERMINATED =
            new CompletableSubscription[0];
    
    boolean add(CompletableSubscription t) {
        if (subscribers == TERMINATED) {
            return false;
        }
        synchronized (this) {
            CompletableSubscription[] a = subscribers;
            if (a == TERMINATED) {
                return false;
            }
            
            CompletableSubscription[] b = 
                new CompletableSubscription[a.length + 1];
            System.arraycopy(a, 0, b, 0, a.length);
            b[a.length] = t;
            subscribers = b;
            return true;
        }
    }

    void remove(CompletableSubscription t) {
        CompletableSubscription[] a = subscribers;
        if (a == EMPTY || a == TERMINATED) {
            return;
        }
        
        synchronized (this) {
            a = subscribers;
            if (a == EMPTY || a == TERMINATED) {
                return;
            }
            
            int j = -1;
            for (int i = 0; i < a.length; i++) {
                if (a[i] == t) {
                    j = i;
                    break;
                }
            }
            
            if (j < 0) {
                return;
            }
            if (a.length == 1) {
                subscribers = EMPTY;
                return;
            }
            CompletableSubscription[] b = 
                new CompletableSubscription[a.length - 1];
            System.arraycopy(a, 0, b, 0, j);
            System.arraycopy(a, j + 1, b, j, a.length - j - 1);
            subscribers = b;
        }
    }


The add and remove methods have the usual and familiar implementations that allows tracking the subscribed child CompletableSubscribers.

Next, let's handle the incoming subscribers:


    @Override
    public void call(CompletableSubscriber t) {
        CompletableSubscription cs = 
            new CompletableSubscription(t, this);
        t.onSubscribe(cs);
        
        if (add(cs)) {
            if (cs.isUnsubscribed()) {
                remove(cs);
            }
        } else {
            Throwable e = error;
            if (e != null) {
                t.onError(e);
            } else {
                t.onCompleted();
            }
        }
    }


We create a CompletableSubscription for each CompletableSubscriber that captures bot the subscriber and the current state instance: if the child subscriber calls unsubscribe on it, it can remove itself from the tracking array of the state. Note that there is a race condition between a successful add and a cancellation coming from downstream which may leave the CompletableSubscriber attached. Therefore, we have to check if the child has unsubscribed during the run of add and if so, we call remove() again to be sure. If the add returns false, that means the CompletableSubject has reached its terminal state and reading the error field can tell how the child subscriber should be notified.

Handling the onXXX notifications isn't that complicated either:


    @Override
    public void onSubscribe(Subscription d) {
        if (subscribers == TERMINATED) {
            d.unsubscribe();
        }
    }
    
    @Override
    public void onCompleted() {
        CompletableSubscription[] a;
        synchronized (this) {
            a = subscribers;
            subscribers = TERMINATED;
        }
        
        for (CompletableSubscription cs : a) {
            cs.actual.onCompleted();
        }
    }
    
    @Override
    public void onError(Throwable e) {
        CompletableSubscription[] a;
        synchronized (this) {
            a = subscribers;
            error = e;
            subscribers = TERMINATED;
        }
        
        for (CompletableSubscription cs : a) {
            cs.actual.onError(e);
        }
    }


Reacting to onSubscribe is up for debate; here I unsubscribe the incoming subscription if the CompletableSubject reached its terminal state. Otherwise, we can't do much since the CompletableSubject could be subscribed to many Completable sources of which any can bring it to its terminal state. You can ignore the parameter this method or keep track all of the Subscriptions in a composite.

The onError and onCompleted methods look quite alike. In both cases we atomically swap in the terminated array and loop through the previous array while emitting the appropriate terminal event. Note that we set the error field in onError before we store the TERMINATED value in subscribers, which will give it proper visibility in the call() method above.


Conclusion

In this post, I've detailed ways of implementing Completable operators that either act as sources of terminal events or transform them in some way and even thrown in a Subject-like implementation of it.

Completable is more like a type-tool to indicate a sequence won't have any value but only side-effects and the reduced API surface may be more convenient than working with the full-blowin Observable API.

Implementing Completable operators is easier than implementing the backpressure-supporting Observable operators, but one has to look out for proper unsubscription chaining, avoiding races between the onXXX methods and utilizing the AtomicXXX classes for the (efficient) state management.

Now that we have even more experience in writing Subjects, the next blog post will conclude the series about ConnectableObservables.

2015. december 11., péntek

The new Completable API (part 1)

Introduction


If you are following the day-to-day RxJava GitHub activity, you might have noticed a PR about a new and mysterious rx.Completable class. This PR has been merged into the 1.x branch (in @Experimental fashion) and will most likely be part of RxJava 1.1.1.

In this two part series, I'm first going to introduce the usage of this class and its relation to the existing Observable and Single classes, then I'll explain the internals and development practices of Completable operators.

Note that as the @Experimental tag indicates, method names and their availability may change at any time before (or after) 1.1.1 is released.

What is this Completable class?

We can think of a Completable object as a stripped version of Observable where only the terminal events, onError and onCompleted are ever emitted; they may look like an Observable.empty() typified in a concrete class but unlike empty(), Completable is an active class. Completable mandates side effects when subscribed to and it is its main purpose indeed. Completable contains some deferred computation with side effects and only notifies about the success or failure of such computation.

Similar to Single, the Completable behavior can be emulated with Observable<?> to some extend, but many API designers think codifying the valuelessness in a separate type is more expressive than messing with wildcards (and usually type-variance problems) in an Observable chain.

Completable doesn't stream a single value, therefore, it doesn't need backpressure, simplifying the internal structure from one perspective, however, optimizing these internals requires more lock-free atomics knowledge in some respect.


Hello World!

Let's see how one can build a (side-effecting) Hello World Completable:

Completable.fromAction(() -> System.out.println("Hello World!"))
.subscribe();

Quite straightforward. We have a set of fromXXX method which can take many sources: Action, Callable, Single and even Observable (stripping any values generated by the latter 3 of course). On the receiving end, we have the usual subscribe capabilities: empty, lambdas for the terminal events, a rx.Subscriber and a rx.Completable.CompletableSubscriber, the main intended receiver for Completables.

Reactive-Empty-Streams?

The definition of the CompletableSubscriber looks quite similar to a Reactive-Streams Subscriber and was chosen over the rx.Subscriber for performance reasons:


public interface CompletableSubscriber {

    void onCompleted();

    void onError(Throwable e);

    void onSubscribe(Subscription d);
}

It features the usual onCompleted() and onError() but instead of extending Subscription and having an add() method like rx.Subscriber, it receives the unsubscription enabling Subscription via the onSubscribe call as in the Reactive-Streams API. This setup has the following benefits:

  • Each CompletableSubscriber implementor can decide if it wants to expose the unsubscription capability to external users unlike rx.Subscriber where anybody can unsubscribe it.
  • In rx.Subscriber, a mandatory (and maybe shared) SubscriptionList container is created to support resource association with any Subscriber instance. However, many Observable operators don't use (or require) resources themselves and have the unnecessary allocation and instant size overhead.
The terminal event semantics is also the same as in Reactive-Streams. When onError or onCompleted is called, the formerly received Subscription should be considered already unsubscribed.

Thus, the protocol looks like as follows:

onSubscribe (onError | onCompleted)?

It contains a mandatory onSubscribe call with a non-null argument followed by, optionally, either an onError with a non-null Throwable or an onCompleted. As within Reactive-Streams, the methods can't throw any checked exceptions or unchecked exceptions other than NullPointerException. This doesn't mean methods shouldn't fail; it means methods should fail in the downstream direction. There are many cases, however, that one can't really put the received exception anywhere (i.e., post onComplete exceptions); the last resort is to sink it into the RxJavaPlugins.getInstance().getErrorHandler().handleError(e).

Create, Lift and Transform

The Completable class has three additional standard helper interfaces, now becoming common with all RxJava base classes:

The first defines a way to specify the deferred computation and send the terminal notifications out to a CompletableSubscriber:

public interface CompletableOnSubscribe
    extends Action1<CompletableSubscriber> { }

CompletableOnSubscribe complete = cs -> {
    cs.onSubscribe(Subscriptions.unsubscribed());
    cs.onCompleted();
}

It is practically a named alias of an Action1 parametrized by the CompletableSubscriber. Creating an instance via a lambda expression is also straightforward (but one has to remember to call onSubscribe before calling the other onXXX methods).

The second interface allows lifting into a Completable sequence by specifying a CompletableSubscriber level transformation.


public interface CompletableOperator 
    extends Func1<CompletableSubscriber, CompletableSubscriber> { }

CompletableOperator swap = child -> new CompletableSubscriber() {
    @Override
    public void onSubscribe(Subscription s) {
        child.onSubscribe(s);
    }
    @Override
    public void onCompleted() {
        child.onError(new RuntimeException());
    }
    @Override
    public void onError(Throwable e) {
        child.onCompleted();
    }
};

Again, the CompletableOperator is an alias for a Func1 instance that let's you wrap, replace and enrich the downstream's CompletableSubscriber. The example implementation shows how one can turn one terminal event into the other via an operator.

The final helper interface allows preparing entire chains of operators to be included in an existing chain:


public interface CompletableTransformer
    extends Func1<Completable, Completable> { }

CompletableTransformer schedule = c -> 
    c.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());


With this alias, one can pre-compose common operations and present it to a stream through the usual compose() method. The example shows the canonical example of making sure the async computation starts on the IO scheduler and the completion is observed on the main thread.


Entering into the Completable world

As with Observables, Completable offers static factory methods that let's you start a stream from various sources:

  • create(CompletableOnSubscribe): let's you write your custom deferred computation that receives a CompletableSubscriber as its observer. The CompletableOnSubscribe is invoked for all CompletableSubscriber separately.
  • complete(): returns a constant instance of a Completable which calls onCompleted without doing anything else.
  • defer(Func0<Completable>): calls the function for each incoming CompletableSubscriber which should create the actual Completable instance said subscriber will subscribe to.
  • error(Throwable): it will emit the given constant Throwable to the incoming CompletableSubscribers.
  • error(Func0<Throwable>): for each incoming CompletableSubscriber, the Func0 is called individually and the returned Throwable is emitted through onError.
  • fromAction(Action0): let's you execute an action for each CompletableSubscriber which call onCompleted (or onError if the action throws an unchecked exception).
  • fromCallable(Callable): unfortunately, Java doesn't have a standard interface for an action which returns void and can throw a checked exception (not even in 8). The closest thing is the Callable interface. This let's you write an action that doesn't require you to wrap the computation into a try-catch but mandates the return of some arbitrary value (ignored). Returning null is acceptable here.
  • fromFuture(Future): let's you attach to a future and wait for its completion, literally. This blocks the subscriber's thread so you will have to use subscribeOn().
  • fromObservable(Observable): let's you skip all values of the source and just react to its terminal events. The Observable is observed in an unbounded backpressure mode and the unsubscription (naturally) composes through.
  • fromSingle(Single): let's you turn the onSuccess call into onCompleted call coming from the Single.
  • never(): does nothing other than setting an empty Subscription via onSubscribe.
  • timer(long, TimeUnit): completes after the specified time elapsed.

In addition, both the Observable and Single classes feature a toCompletable() method for convenience.

The naming of the fromXXX methods are deliberately specific: the Java 8 compiler likes to get into ambiguity problems due to the similarly appearing functional-interfaces.


Leaving the Completable world

One has to, eventually, leave the Completable world and observe the terminal event in some fashion. The Completable offers some familiar methods to make this happen: subscribe(...).

We can group the subscribe() overloads into two sets. The first set returns a Subscription that allows external cancellation and the second relies on the provided class to allow/manage unsubscriptions.

The first group consists of the lambda-form subscriptions:

  • subscribe(): runs the Completable and relay any onError call to the RxJavaPlugins.
  • subscribe(Action0): runs the Completable and calls the given Action0 on successful completion. The onError calls are still relayed to RxJavaPlugins.
  • subscribe(Action1, Action0): runs the Completable and calls Action1 if it ended with an onError or calls Action0 if it ended with a normal onCompleted.

Since the lambda callbacks don't have access to the underlying Subscription sent through onSubscribe, these methods return a Subscription themselves to allow external unsubscription to happen. Without it, there wouldn't be any way of cancelling such subscriptions.


The second group of subscribe methods take the multi-method Subscriber instances:

  • subscribe(CompletableSubscriber): runs the Completable and calls the appropriate onXXX methods on the supplied CompletableSubscriber instance.
  • subscribe(Subscriber<T>): runs the Completable and calls the appropriate onXXX methods on the supplied rx.Subscriber instance.


Sometimes, one wants to wait for the completion on the current thread. Observable has a set of methods accessible through toBlocking() for this purpose. Since there are not many ways one can await the result of a Completable, the blocking methods are part of the Completable class itself:

  • await(): await the termination of the Completable indefinitely and rethrow any exception it received (wrapped into a RuntimeException if necessary).
  • await(long, TimeUnit): same as await() but with bounded wait time which after a TimeoutException is thrown.
  • get(): await the termination of the Completable indefinitely, return null for successful completion or return the Throwable received via onError.
  • get(long, TimeUnit): same as get() but with bounded wait time which after a TimeoutException is thrown.


Completable operators

Finally, let's see what operators are available to work with an Completable. Unsurprisingly, many of them match their counterpart in Observable, however, a lot of them is missing because they don't make sense in a valueless stream. This include the familiar maptake, skip, flatMap, concatMap, switchMap, etc. operators.

The first set of operators is accessible as a static method and usually deal with a set of Completables. Many of them have overloads for varargs and Iterable sequences.

  • amb(Completable...): terminates as soon as any of the source Completables terminates, cancelling the rest.
  • concat(Completable...): runs the Completable one after another until all complete successfully or one fails.
  • merge(Completable...): runs the Completable instances "in parallel" and completes once all of them completed or any of them failed (cancelling the rest).
  • mergeDelayError(Completable...): runs all Completable instances "in parallel" and terminates once all of them terminate; if all went successful, it terminates with onCompleted, otherwise, the failure Throwables are collected and emitted in onError.
  • using(Func0, Func1, Action1): opens, uses and closes a resource for the duration of the Completable returned by Func1.

The second set of operators are the usual (valueless) transformations:

  • ambWith(Completable): completes once either this or the other Completable terminates, cancelling the still running Completable.
  • concatWith(Completable): runs the current and the other Completable in sequence.
  • delay(long, TimeUnit): delays the delivery of the terminal events by a given time amount.
  • endWith(...): continues the execution with another Completable, Single or Observable.
  • lift(CompletableOperator): lifts a custom operator into the sequence that allows manipulationg the incoming downstream's CompletableSubscriber's lifecycle and event delivery in some manner before continuing the subscribing upstream.
  • mergeWith(Completable): completes once both this and the other Completable complete normally
  • observeOn(Scheduler): moves the observation of the terminal events (or just onCompletded) to the specified Scheduler.
  • onErrorComplete(): If this Completable terminates with an onError, the exception is dropped and downstream receives just onCompleted.
  • onErrorComplete(Func1): The supplied predicate will receive the exception and should return true if the exception should be dropped and replaced by a onCompleted event.
  • onErrorResumeNext(Func1): If this Completable fails, the supplied function will receive the exception and it should return another Completable to resume with.
  • repeat(): repeatedly executes this Completable (or a number of times in another overload)
  • repeatWhen(Func1): repeatedly execute this Completable if the Observable returned by the function emits a value or terminate if this Observable emits a terminal event.
  • retry(): retries this Completable if it failed indefinitely (or after checking some condition in other overloads):
  • retryWhen(Func1): retries this Completable if it failed and the Observable returned by the function emits a value in response to the current exception or terminates if this Observable emits a terminal event.
  • startWith(...): begins the execution with the given Completable, Single or Observable and resumes with the current Completable.
  • timeout(long, TimeUnit, Completable): switches to another Completable if this completable doesn't terminate within the specified time window.
  • to(Func1): allows fluent conversion by calling a function with this Completable instance and returning the result.
  • toObservable(): converts this Completable into an empty Observable that terminates if this Completable terminates.
  • toSingle(Func0<T>): converts this Completable into a Single in a way that when the Completable completes normally, the value provided by the Func0 is emitted as onSuccess while an onError just passes through.
  • toSingleDefault(T): converts this Completable into a Single in a way that when the Completable completes normally, the value provided is emitted as onSuccess while an onError just passes through.
  • unsubscribeOn(Scheduler): when the downstream calls unsubscribe on the supplied Subscription via onSubscribe, the action will be executed on the specified scheduler (and will propagate upstream).

The final set of operators support executing callbacks at various lifecycle stages (which can be used for debugging or other similar side-effecting purposes):

  • doAfterTerminate(Action0): executes the action after the terminal event has been sent downstream CompletableSubscriber.
  • doOnComplete(Action0): executes an action just before the completion event is sent downstream.
  • doOnError(Action1): calls the action with the exception in a failed Completable just before the error is sent downstream.
  • doOnTerminate(Action0): executes the action just before any terminal event is sent downstream.
  • doOnSubscribe(Action1): calls the action with the Subscription instance received during the subscription phase.
  • doOnUnsubscribe(Action0): executes the action if the downstream unsubscribed the Subscription connecting the stages.
  • doOnLifecycle(...): combines the previous operators into a single operator and calls the appropriate action.


Currently, there are no equivalent Subject implementations nor publish/replay/cache methods available. Depending on the need for these, they can be added later on. Note however that since Completable deals only with terminal events, all Observable-based Subject implementation have just a single equivalent, Completable-based Subject implementation and there is only one way to implement the publish/replay/cache methods.

It is likely the existing Completable operators can be extended or other existing Observable operators matched. Until then, you can use the

toObservable().operator.toCompletabe()

conversion pattern to reach out to these unavailable operators. In addition, I didn't list all overloads so please consult with the source code of the class (or the Javadoc once it becomes available online).


Conclusion

In this post, I've introduced the new Completable base class and detailed the available methods and operators on it. Its usage pattern greatly resembles the use of Observable or Single with the difference that it doesn't deal with values at all but only with the terminal events and as such, many operators are meaningless for Completable.

In the next part, I'm going to talk about how one can create source and transformative operators for Completable by implementing the CompletableOnSubscribe and CompletableOperator interfaces respectively.