2015. október 21., szerda

ConnectableObservables (part 2)

Introduction


In the previous post, I've shown how one can write a "simple" ConnectableObservable that uses a Subject to dispatch events to subscribers once it has been connected.

The shortcoming of the solution is that there is no request coordination and everything runs in unbounded mode: the developers have to apply onBackpressureXXX strategies per subscriber, however, that leads to either dropping data or buffer bloat.

If the underlying Observable is cold, there should be a way to make sure it emits only as much elements as the child subscribers can process. To achieve this, we need request coordination.


Request coordination

So far, the operators we were implementing had to deal with a single child subscriber and its request at a time. One had to either pass it through, rebatch it or accumulate it, based on the business logic of said operator.

When there are multiple child Subscribers, the problem space suddenly receives a new dimension. What are the new problems?

Every bit counts

First, different child subscribers may request different amounts. Some may request small amounts, some may request larger amounts and others may want to run in unbounded mode (i.e., request(Long.MAX_VALUE)). In addition, the request calls may happen any time and with any amount.

Given such heterogeneous request pattern, what should be the request amount sent to the upstream Observable source?

There are two main options:

  1. request as much that the smallest child Subscriber requested and
  2. request as much as the largest child Subscriber requested.
Option 1) is essentially the lockstep approach. Its benefit is that there is no no need for request re-batching and buffering since once the upstream emits, everybody can receive it immediately. (Rebatching and buffering is an option in case the request amounts are really 1s or 10s at a time.) The drawback is that the whole setup slows down to the slowest child Subscriber, which if "forgets" to request, nobody gets anything.

Option 2) gives more room to individual child Subscribers and allows them to run on their own pace. However, this solution requires unbounded buffering capability (which may be shared or per each Subscriber). This means if there is an unbounded child Subscriber, the operator has to request Long.MAX_VALUE and fill the buffers for everyone. This, depending on the operator, may be of no problem though.

Subscribers may come and go at will

The second problem is that the the number of Subscribers may not be constant: new subscribers arrive, old ones leave. This poses another set of problems:

  1. A child Subscriber may request Long.MAX_VALUE then leave after a few (or no) elements.
  2. A child Subscriber may arrive but not request anything, stopping everyone else.
  3. A child Subscriber may leave at any time and thus its request amount "pressure" has to be released.
  4. All child Subscribers leave before the upstream Observable completes. What should happen in this case?
Unfortunately, problems 1) and 2) require mutually exclusive approaches explained above (lockstep vs. unbounded buffering). Problem 3) requires unsubscription action.

Problem 4) depends on the approach taken in respect to 1) and 2).

Within the lockstep approach, two sub-options arise. Either one has to introduce some bounded buffers that will hold onto the requested amounts, which now has to be re-batched to fit in, and simply await the new Subscribers. Otherwise, one has to slowly "drip" away the source values until a child Subscriber arrives. 

Within the unbounded buffering approach, one can simply keep buffering or again, start dropping values.

Approaches taken in RxJava

RxJava has two operators that return a ConnectableObservable: publish() and replay(). For a long time, these were ignoring backpressure completely and behaved just like the MulticastSupplier in the previous part.

These operators were rewritten to support backpressure (in 1.0.13 and 1.0.14 respectively) and had to take the problems mentioned before into account. The solutions were as follows

Operator publish() does lockstepping with a fixed prefetch buffer: the buffer is only drained (and then replenished) if all known child Subscribers can take a value. If there are no child Subscribers, it "slowly drips away" it source, which means it starts to request 1 by 1 and drops these values.

Operator replay() does unbounded buffering. The reason for this is that both the bounded and unbounded version of replay() has to buffer and replay all values from the upstream anyway. You may think, why buffer everything when the replay is time and/or size bound. The answer is that these operators, similar to Subjects, have to deliver events continuously and without skips; if there is an child Subscriber that arrived at some time, requested 1 then went to "sleep", the next time it requests the bounded replay has to present the next value, no matter how far ahead the other Subscribers went in the meantime.


The effect of disconnection

There is a problem that isn't dealt with in the RxJava operators but has to be mentioned. If one unsubscribes the Subscription returned by the connect() method, the upstream will stop sending further events.

The problem is that this may leave the child Subscribers hanging: they won't receive any further events (beyond those that are already in some buffer of the respective operator). We have similar problems with CompletableFutures in Java 8. One can cancel a Future but what happens to those that were awaiting its result?

The solution in Java 8 is to emit a CancellationException as the result in this case so that the dependent computations can terminate. However, this isn't the case with RxJava (in both 1.x and 2.x branches). The current implementation will just hang the child Subscribers.

This problem may appear outside of a ConnectableObservable as well. For some time, the RxAndroid 0.x library contained an operator that were applied to all sequences and unsubscribed them if the lifecycle required cleanup. The problem was that this left child Subscribers without termination events. I suggested emitting an onError and onCompleted event for this case. There was no resolution of the problem and the operator was removed before 1.0.

On a personal note, I don't remember anyone from the community complaining about this problem and it seems nobody is really affected by this behavior. As with many obscure and corner cases, if I don't mention them, nobody else seems to discover them.

The effect of termination

Upstream Observables may terminate normally, in which case the ConnectableObservable will emit the terminal event to child Subscribers.

At this point, a new Subscriber may subscribe to the terminated ConnectableObservable. What should happen in this case? Does the termination also mean disconnection? Should the child Subscriber get terminated instantly, similar to PublishSubject?

Again the solution requires business decision. RxJava chose the approach that a terminal event sent to a ConnectableObservables is considered a disconnect event and late coming Subscribers won't receive any terminal event but will be remembered until another call to connect() happens.

This has the benefit that the developers can "prepare" child Subscribers before the upstream Observable gets run and thus avoid losing events. The drawback is that one has to remember to call connect() again, otherwise nothing runs and the Subscribers are left hanging.

Family of collectors and emitters

Before we jump into some code, I'd like to sketch out a pattern that is the foundation of almost all operators that deal with either multiple sources or multiple child Subscribers.

I've written dozens of such operators and I've noticed they all use the same set of components and methods:

  1. They all need to track Subscribers, either the child Subscribers or the Subscribers that are subscribed to the source Observables. The tracking structure uses the copy-on-write approach of array-based resource containers.
  2. They all use an emitter loop (synchronized) or drain loop (atomics) which has to be triggered from many places: when an event is emitted from upstream(s), when a new child Subscriber arrives, when a request comes from child Subscribers and sometimes when a child unsubscribes.
  3. The loop has some preprocessing step: figuring out where the Subscribers are at the moment, selecting which source to drain or combining available values from sources in some fashion
  4. Finally, the events are delivered to Subscriber(s) and replenishments are requested from source Observable(s).

Which operator?

Now that we are aware of the problems, let's implement a ConnectableObservable which does request coordination.

I've been thinking what operator to implement. My first thought was to show how to implement the operator pair of an AsyncSubject or BehaviorSubject (similar to how publish() is the pair of PublishSubject), however, the former can be implemented using plain composition plus replay():

public ConnectableObservable<T> async() {
    return takeLast(1).replay();
}

Implementing the pair of BehaviorSubject is a bit more involved. The naive implementation would use composition such as this:

public ConnectableObservable<T> behave() {
    return replay(1);
}

However, this doesn't properly capture the behavior of a terminated BehaviorSubject: child Subscribers get nothing but a terminal event whereas replay will always replay 1 value and 1 terminal event after it completed.

To minimize brain melting, I'm not going to show how to implement a variant of the least complex of the operators: publish().


Publish (or die)

First, let's sketch out all the requirements we want to achieve:


  1. The operator should do a lockstep-based request coordination with prefetching (for efficiency)
  2. The effect of disconnection on the child Subscribers should be parametrizable: no event, signal error or signal completion.
  3. The operator should be considered terminated and new subscribers will wait for the next connect().
  4. The operator will allow errors to cut ahead. (Implementing error-delay is an excercise left to the reader).
  5. The operator will use a power-of-2 prefetch buffer.


With these requirements, we start with the skeleton of the class as usual:


public class PublishConnectableObservable<T> 
extends ConnectableObservable<T> {

    public enum DisconnectStrategy {                           // (1)
        NO_EVENT,
        SEND_ERROR,
        SEND_COMPLETED
    }
    
    public static <T> PublishConnectableObservable<T> 
    createWith(                                               // (2)
            Observable<T> source, 
            DisconnectStrategy strategy) {
        State<T> state = new State<>(strategy, source);
        return new PublishConnectableObservable<>(state);
    }
    
    final State<T> state;
    
    protected PublishConnectableObservable(State<T> state) {  // (3)
        super(state);
        this.state = state;
    }
    
    @Override
    public void connect(
            Action1<? super Subscription> connection) {       // (4)
        state.connect(connection);
    }
}

Nothing extraordinary so far:

  1. We create an enum for the disconnection strategy
  2. We have to use a factory method because the internal state has to be accessible from OnSubscribe and from instance methods of this class.
  3. We construct the object where State doubles as an OnSubscribe to save on allocation.
  4. Finally, we delegate the connection attempt to the state object. This gives us a less verbose source code.
Next comes the state object with some familiar structure (see last post of this series):

static final class State<T> implements OnSubscribe<T> {
    final DisconnectStrategy strategy;
    final Observable<T> source;
    
    final AtomicReference<Connection<T>> connection;      // (1)
      
    public State(DisconnectStrategy strategy, 
            Observable<T> source) {                       // (2)
        this.strategy = strategy;
        this.source = source;
        this.connection = new AtomicReference<>(
            new Connection<>(this)
        );
    }
        
    @Override
    public void call(Subscriber<? super T> s) {           // (3)
        // implement
    }
        
    public void connect(
        Action1<? super Subscription> disconnect) {       // (4)
        // implement
    }
        
    public void replaceConnection(Connection<T> conn) {   // (5)
        Connection<T> next = new Connection<>(this);
        connection.compareAndSet(conn, next);
    }
}

The state object will handle the connection, subscription and reconnection cases:

  1. Because we have to reconnect, we store the current connection in an AtomicReference.
  2. We initialize the source and strategy fields and set up an initial unconnected connection.
  3. The method call() from OnSubscribe will handle the subscribers; I'll show the implementation further down.
  4. The connect method will handle the connection attempts; I'll show the implementation further down.
  5. Finally, once a connection has been terminated on its own or via unsubscribe, we have to replace the old connection with a fresh connection atomically and not overwriting somebody else's fresh connection due to races.
Before going deep into the complicated logic, two more simplistic classes remain. The first is the Subscriber that will be subscribed to the source Observable:


static final class SourceSubscriber<T> 
extends Subscriber<T> {
    final Connection<T> connection;
    public SourceSubscriber(
            Connection<T> connection) {    // (1)
        this.connection = connection;
    }
    @Override
    public void onStart() {
        request(RxRingBuffer.SIZE);        // (2)
    }

    @Override
    public void onNext(T t) {
        connection.onNext(t);              // (3)
    }

    @Override
    public void onError(Throwable e) {
        connection.onError(e);
    }

    @Override
    public void onCompleted() {
        connection.onCompleted();
    }
    
    public void requestMore(long n) {      // (4)
        request(n);
    }
}

The class, again is full of delegations:

  1. We store the connection object and we will delegate events to it.
  2. If this Subscriber is subscribed to the source Observable, we request only a limited number of elements upfront. (Parametrizing this is left to the reader).
  3. Again, for class simplicity, we delegate the events to the connection object, which happens to implement the Observer interface for convenience
  4. We will have to replenish all consumed values but request() is a protected method: it is exposed through the requestMore() method.
Next comes a Publisher and Subscriber instance that will handle the unsubscription and request accounting for the child Subscribers of our operator.

static final class PublishProducer<T> 
implements Producer, Subscription {
    final Subscriber<? super T> actual;
    final AtomicLong requested;
    final AtomicBoolean once;
    volatile Connection<T> connection;             // (1)
    
    public PublishProducer(
            Subscriber<? super T> actual) {
        this.actual = actual;
        this.requested = new AtomicLong();
        this.once = new AtomicBoolean();
    }
    
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n > 0) {
            BackpressureUtils
                .getAndAddRequest(requested, n);
            Connection<T> conn = connection;       // (2)
            if (conn != null) {
                conn.drain();
            }
        }
    }
    
    @Override
    public boolean isUnsubscribed() {
        return once.get();
    }
    
    @Override
    public void unsubscribe() {
        if (once.compareAndSet(false, true)) {
            Connection<T> conn = connection;       // (3)
            if (conn != null) {
                conn.remove(this);
                conn.drain();
            }
        }
    }
}

This is a bit more interesting.

  1. We need to know about what connection this class has to deal with for two reasons: 1) it has to notify the connection the underlying Subscriber can receive values, 2) if the subscriber unsubscribes, it may mean the other Subscribers can now receive further values.
  2. Since request() runs asynchronously, the connection might not be available yet. We have to remember to call drain() once this connection becomes available (shown later on).
  3. Since unsubscribe() runs asynchronously as well, it has check for non-null and only remove itself from the array of subscribers (shown later on). Note also the idempotence provided by once.

The final class, in skeleton form is the Connection itself:


@SuppressWarnings({ "unchecked", "rawtypes" })
static final class Connection<T>
 implements Observer<T> {                             // (1)

    final AtomicReference<PublishProducer<T>[]>
        subscribers;
    final State<T> state;
    final AtomicBoolean connected;
    
    final Queue<T> queue;
    final AtomicReference<Throwable> error;
    volatile boolean done;

    volatile boolean disconnected;
    
    final AtomicInteger wip;
    
    final SourceSubscriber parent;
    
    
    static final PublishProducer[] EMPTY = 
        new PublishProducer[0];

    static final PublishProducer[] TERMINATED = 
        new PublishProducer[0];
    
    public Connection(State<T> state) {               // (2)
        this.state = state;
        this.subscribers = new AtomicReference<>(EMPTY);
        this.connected = new AtomicBoolean();
        this.queue = new SpscArrayQueue(
            RxRingBuffer.SIZE);
        this.error = new AtomicReference<>();
        this.wip = new AtomicInteger();
        this.parent = createParent();
    }
    
    SourceSubscriber createParent() {                 // (3)
        // implement
    }
    
    boolean add(PublishProducer<T> producer) {        // (4)
        // implement
    }
    
    void remove(PublishProducer<T> producer) {
        // implement
    }
    
    void onConnect(
         Action1<? super Subscription> disconnect) {  // (5)
        // implement
    }
    
    @Override
    public void onNext(T t) {                         // (6)
        // implement
    }

    @Override
    public void onError(Throwable e) {
        // implement
    }

    @Override
    public void onCompleted() {
        // implement
    }
    
    void drain() {                                    // (7)
        // implement
    }
    
    boolean checkTerminated(boolean d, 
        boolean empty) {
        // implement
    }
}

The method names and fields should look familiar by now:


  1. The class has to manage a set of state variables: the current array of Subscribers, the value queue plus the terminal event holders, the connection and disconnection indicators, the work counter for the queue-drain approach, the Subscriber that is subscribed to the Observable and finally the EMPTY and TERMINATED array indicators.
  2. The constructor initializes the various fields.
  3. The subscriber needs some preparations besides creating a new SourceSubscriber, therefore, I factored it out into a separate method.
  4. The copy-on-write handling of the known subscribers is done via add and remove, similar to how we did this with Subjects and with the array-backed Subscription container.
  5. We will handle the source events with these onXXX methods.
  6. Finally, the drain and termination check methods for the queue-drain approach.


The meltdown

So far, the classes and those methods implemented were nothing special. However, the real complexity starts from here on. I'll show the missing implementations one by one and mention the concurrency considerations with them as well..

I suggest you take a small break, drink some power-up, clear your head at this point.

Done? All right, let'd do this.

State.call

This method is responsible for handling the incoming child Subscribers. The method has to consider that the connection may terminate on its own or get disconnected concurrently:


@Override
public void call(Subscriber<? super T> s) {
    PublishProducer<T> pp 
        = new PublishProducer<>(s);
    
    s.add(pp);
    s.setProducer(pp);                                // (1)

    for (;;) {
        Connection<T> curr = connection.get();
        
        pp.connection = curr;                         // (2)
        if (curr.add(pp)) {                           // (3)
            if (pp.isUnsubscribed()) {                // (4)
                curr.remove(pp);
            } else {
                curr.drain();                         // (5)
            }
            break;
        }
    }
}


  1. First, we create a PublishProducer and set it on the subscriber to react to requests and unsubscription.
  2. Next, we retrieve the current known connection and set it on the PublishProducer so it can call the drain() method if it wishes.
  3. We attempt to add the PublishProducer to the internal tracking array. If this fails, it means the current connection has terminated and we have to try the next connection (once becomes available) by looping a bit.
  4. Even if the add succeeded, the child might have just unsubscribed and thus the remove might not have found it. By calling it here again, we can make it sure the PublishProducer doesn't stay in the array unnecessarily.
  5. Once the add succeeded, we have to call drain since a concurrent call in PublishProducer might have not seen a non-null connection and couldn't notify the connection for more values (or about unsubscription). The call will make sure this PublishProducer is handled as necessary.


State.connect

This method is responsible for triggering a single connection on an unconnected Connection instance and/or return the Subscription that let's an active Connection get unsubscribed.


public void connect(Action1<? super Subscription> disconnect) {
    for (;;) {
        Connection<T> curr = this.connection.get();
        
        if (!curr.connected.get() && 
                curr.connected.compareAndSet(false, true)) {  // (1)
            curr.doConnect(disconnect);
            return;
        }
        if (!curr.parent.isUnsubscribed()) {                  // (2)
            disconnect.call(curr.parent);
            return;
        }
        
        replaceConnection(curr);                              // (3)
    }
}

This method is also racing with a termination/disconnection and as such, it has to take them into account when attempting to establish a fresh connection.


  1. It works by first retrieving the current connection and if the current thread is the first, switch it into a connected state. If successful, the doConnect method is called which will do the necessary subscription work.
  2. Otherwise, check if the current connection is unsubscribed. If not return it to the callback. Note that there is a small window here where the current connection is determined active but may become disconnected/terminated when the method is called. Resolving this issue requires either blocking synchronization between termination and connection or other serialization approach. In practice, however, this is rarely an issue and can be ignored.
  3. Finally, if the current connection is disconnected, let's replace it with a fresh, not-yet connected Connection and try the loop again.

Connection.createParent

The method constructs a SourceSubscriber and sets it up to behave according to the disconnection strategy:

SourceSubscriber createParent() {
    SourceSubscriber parent = new SourceSubscriber<>(this);
    
    parent.add(Subscriptions.create(() -> {
        switch (state.strategy) {
        case SEND_COMPLETED:
            onCompleted();
            break;
        case SEND_ERROR:
            onError(new CancellationException("Disconnected"));
            break;
        default:
            disconnected = true;
            drain();
        }
    }));
    
    return parent;
}

The method will instantiate a SourceSubscriber and add a Subscription to it. This subscription, depending on the disconnection strategy, will either call onCompleted, onError with a CancellationException or set the disconnect flag followed by a call to drain (the onXXX methods call drain()).

We need the disconnected flag because we can't use an isUnsubscribed check: it would always skip the terminal event and appear as if we'd have the NO_EVENT strategy.


Connection.add, Connection.remove

The algorithms for adding and removing resources to an array-based container with copy-on-write semantics should be quite familiar by now. For completeness, here are the methods anyway:


boolean add(PublishProducer<T> producer) {
    for (;;) {
        PublishProducer<T>[] curr = subscribers.get();
        if (curr == TERMINATED) {
            return false;
        }
        
        int n = curr.length;
        
        PublishProducer<T>[] next = new PublishProducer[n + 1];
        System.arraycopy(curr, 0, next, 0, n);
        next[n] = producer;
        if (subscribers.compareAndSet(curr, next)) {
            return true;
        }
    }
}

void remove(PublishProducer<T> producer) {
    for (;;) {
        PublishProducer<T>[] curr = subscribers.get();
        if (curr == TERMINATED || curr == EMPTY) {
            return;
        }
        
        int n = curr.length;
        
        int j = -1;
        for (int i = 0; i < n; i++) {
            if (curr[i] == producer) {
                j = i;
                break;
            }
        }
        
        if (j < 0) {
            break;
        }
        PublishProducer<T>[] next;
        if (n == 1) {
            next = EMPTY;
        } else {
            next = new PublishProducer[n - 1];
            System.arraycopy(curr, 0, next, 0, j);
            System.arraycopy(curr, j + 1, next, j, n - j - 1);
        }
        if (subscribers.compareAndSet(curr, next)) {
            return;
        }
    }
}

Connection.onXXX

The four onXXX methods on the class are quite sort, therefore, I'll show them togheter in this subsection:


void onConnect(
         Action1<? super Subscription> disconnect) {        // (1)
    disconnect.call(this.parent);
      
    state.source.unsafeSubscribe(parent);
}
    
@Override
public void onNext(T t) {                                   // (2)
    if (queue.offer(t)) {
        drain();
    } else {
        onError(new MissingBackpressureException());
        parent.unsubscribe();
    }
}

@Override
public void onError(Throwable e) {
    if (!error.compareAndSet(null, e)) {                    // (3)
        e.printStackTrace();
    } else {
        done = true;
        drain();
    }
}

@Override
public void onCompleted() {                                 // (4)
    done = true;
    drain();
}

Let's see them:


  1. The reason we have to drag the Action1 all the way here instead of calling it State.connect at (2) is that the call must happen before the actual subscription to the underlying Observable to allow synchronous cancellation.
  2. The next method offers the value and calls drain to make sure it is delivered if possible. Note that if the queue is full, we reward it with a MissingBackpressureException and unsubscription; it means the upstream doesn't handle backpressure well or at all.
  3. Since we may receive an error as part of the upstream event as well as a disconnection event, we heed an AtomicReference and set only one of them as the terminal event. In this example, the first one wins, the other gets printed to the console. If the CAS succeded, we set the done flag and call drain to handle things.
  4. It is true onCompleted can also be called from two places, but since it just sets the done flag to true, there is no need for any CAS-ing here. It is also true that due to the disconnection strategy, the onError and onCompleted can race with each other. However, since the difference of handling them is just that error contains null or not, it is't really a problem. Note also that since we used unsafeSubscribe in onConnect, there shouldn't be any call to the SourceSubscriber.unsubscribe coming from upstream and causing trouble if the source terminated normally and the disconnection strategy happen to be SEND_ERROR.

Connection.drain

This is unquestionably the heart of the operator and the most complicated logic due to the effects of concurrently changing values it has to rely on. I'll explain it in piece by piece:

First, it contains a familiar drain loop with wip counter and missed count:

void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }
    
    int missed = 1;
    
    for (;;) {

        if (checkTerminated(done, queue.isEmpty())) {
            return;
        }

        // implement rest
       
        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

Nothing fancy yet. The wip counter doubles as the serialization entry point on a 0 - 1 transition and a missed counter above that.

If inside the loop, the first thing to do is to check for a terminal condition via checkTerminated (explained later). It checks for the terminal events and disconnected state and acts accordingly. This is done before the upcoming request coordination since terminal events are not subject to backpressure management and can be emitted before any child Subscriber requests anything.

The next step is to perform request coordination. Since we set out to do a lockstep coordination, we have to ask all known child subscribers for their current requested amount and figure out the minimum amount everybody can receive. Note that this can be zero.


        //... checkTerminated call

        PublishProducer<T>[] a = subscribers.get();
        
        int n = a.length;
        long minRequested = Long.MAX_VALUE;
        
        for (PublishProducer<T> pp : a) {
            if (!pp.isUnsubscribed()) {
                minRequested = Math.min(minRequested, pp.requested.get());
            }
        }

        // ... missed decrementing

At this point, it is possible n is zero. If there are no subscribers, we set out to "slowly drip away" the available values:


        // ... minRequested calculation

        if (n == 0) {
            if (queue.poll() != null) {
                parent.requestMore(1);
            }
        } else {
            // implement rest           
        }

        // ... missed decrementing

We have to check if the queue is non empty and consume a value with a single poll() then we ask for replenishment. Note that the "slowness" depends on the speed of the upstream Observable. If one decides to do nothing if there are no subscribers, the if statement can be simplified to if (n != 0) { } but should not be removed!

If we know there are any subscribers and we know the minimum requested amount, we can try draining our queue and emit that amount to everybody.


            // if n != 0 branch

            if (checkTerminated(done, queue.isEmpty())) {   // (1)
                return;
            }

            long e = 0L;
            while (minRequested != 0) {

                boolean d = done;
                T v = queue.poll();
                
                if (checkTerminated(d, v == null)) {        // (2)
                    return;
                }
                
                if (v == null) {
                    break;
                }

                // final detail to implement
                
                minRequested--;                             // (3)
                e++;
            }
            
            if (e != 0L) {                                  // (4)
                parent.requestMore(e);
            }
        
        // end of n != branch

This should also look familiar. We check the  terminal conditions again (1) (optional if you want to be eager). Next, we loop until the minRequested is zero or the queue becomes empty. Inside the loop we do the usual termination checks (2) and emission accounting (3). After the loop, if there were emissions, we ask for replenishment from the SourceSubscriber instance (4).

Lastly, the final piece of the drain method is the publication of each value to all subscribers:


                // ... v == null check

                for (PublishProducer<T> pp : a) {
                    pp.actual.onNext(v);
                    if (pp.requested.get() != Long.MAX_VALUE) {
                        pp.requested.decrementAndGet();
                    }
                }

                // ... minRequested--

For each of the PublishProducer (i.e., child Subscriber), we emit the value and decrement the requested amount if not Long.MAX_VALUE (i.e., unbounded child Subscriber).

Wasn't that painful, was it?


Connection.checkTerminated

The checkTerminated method has more things to do since it has to deliver the terminal events to all Subscribers while making sure new Subscribers don't succeed within the add method.


boolean checkTerminated(boolean done, boolean empty) {    // (1)
    if (disconnected) {                                   // (2)
        subscribers.set(TERMINATED);
        queue.clear();
        return true;
    }
    if (done) {
        Throwable e = error.get();
        if (e != null) {
            state.replaceConnection(this);                // (3)
            queue.clear();

            PublishProducer<T>[] a = 
                subscribers.getAndSet(TERMINATED);        // (4)
            
            for (PublishProducer<T> pp : a) {             // (5)
                if (!pp.isUnsubscribed()) {
                    pp.actual.onError(e);
                }
            }
            
            
            return true;
        } else
        if (empty) {
            state.replaceConnection(this);                // (6)

            PublishProducer<T>[] a = 
                subscribers.getAndSet(TERMINATED);
            
            for (PublishProducer<T> pp : a) {
                if (!pp.isUnsubscribed()) {
                    pp.actual.onCompleted();
                }
            }
            
            return true;
        }
    }
    return false;
}

It works as follows:


  1. The method takes only a done and an empty indicator but not any individual Subscriber or the array of known subscribers.
  2. Since the disconnected flag is set only if the disconnection strategy was NO_EVENT, we can't do much but just set in the TERMINATED indicator array. Anybody unlucky enough still subscribed won't get any further events.
  3. If the done flag is true and there is an error we first replace the current connection with a fresh one (within the state) so newcommers won't try to subscribe to a terminated connection. 
  4. After clearing the queue for any normal values, we swap in the TERMINATED indicator array so ...
  5. ... anybody who got in can now receive its terminal event and the drain loop will quit.
  6. The same logic applies in the case when the upstream has completed normally and the queue has become empty.

Testing it out

Finally, we reached the end of one of the most complicated operators in history of RxJava. Now let's reward us via a small unit test to see if the backpressure and the disconnection stategy really works:


Observable<Integer> source = Observable.range(1, 10);

TestSubscriber<Integer> ts = TestSubscriber.create(5);

PublishConnectableObservable<Integer> o = createWith(
    source, DisconnectStrategy.SEND_ERROR);

o.subscribe(ts);

Subscription s = o.connect();

s.unsubscribe();

System.out.println(ts.getOnNextEvents());
ts.assertValues(1, 2, 3, 4, 5);
ts.assertNotCompleted();
ts.assertError(CancellationException.class);

It should print [1, 2, 3, 4, 5] to the console and quit without any AssertionErrors. Neat, isn't it?

Conclusion

In this lenghtly and brain-stretching blog post, I've explained the requirements and problems around ConnectableObservables that want to do request coordination between its child Subscribers and its upstream Observable. I then showed an implementation of a publish() like ConnectableObservable which features disconnection strategy to avoid hanging its child Subscribers.

This is, however, not the most complicated operator in RxJava. It isn't replay(), even though the bounded version is a bit more complicated than the PublishConnectableObservable (but only due to the boundary management). It is not the most commonly used operator either and in fact, that is simpler due to fewer state-clashing. No, the most complicated operator to day has so intertwined request coordination that even I'm not sure it is possible to write a buffer-bounded version of it.

But enough of mysterious foreshadowing! In the next part, I'm going to detail what it takes to implement a replay()-like ConnectableObservable.

3 megjegyzés:

  1. Your articles look profound and a bit scary. Thanks for the knoweledge!

    VálaszTörlés
  2. In the `checkTerminated` implementation, I think the line of code before (6) should check the `empty` parameter, rather than `queue.isEmpty()`, right?

    VálaszTörlés