2015. október 27., kedd

ConnectableObservables (part 3)

Introduction

In the previous post, we saw how to build a ConnectableObservable which publishes events to its child Subscribers when all of them requested some amount, making them go in a lockstep.

In this blog post, I'm going to detail how one can build a replay-like ConnectableObservable: i.e., ReplayConnectableObservable. The internal structure is very similar to the PublishConnectableObservable but the request coordination is going to be more complicated.

Replay bounded or unbounded

When one wants to create a replay-like operator (or Subject), the decision has to be made whether or not do bounded or unbounded replays. Unbounded replay means that from the time of the connect(), every value is essentially cached/buffered and every subscriber will receive values from the very beginning.

Bounded replay means that the cache will start losing data as time and values go by so a late subscriber will "skip" these early values and only get the newer ones.

However, the data structures supporting these modes are quite different. The unbounded buffer can be any list-like data structure, such as j.u.List or a hybrid linked-array list (to avoid copying when the list grows). The bounded buffer is going to be a linked-list like structure, but j.u.LinkedList can't work here; we need access to the individual nodes.

The reason is twofold: 1) we need a way to tell the "current start" of the buffer as time goes and 2) we have to deal with child Subscribers who lag behind with requests and can't let them miss in between values.

The right data structure is a singly-linked list where nodes hold the actual value. Then we have to keep reference to the head and tail of the list. The head indicates where the replay will start for newcommers and the tail indicates where to append new nodes containing values from the main source.

This structure has two implications: 1) due to the singly linked nature, if the head of the list is no longer referenced by the head or by any child Subscriber, it can be "automatically" garbage collected and 2) if we pin the head pointer and never move it, we get an unbounded replay buffer (although with more overhead due to pointer chasing).

For unbounded buffers, both head and tail are integers, head is zero and tail is the number of available values.

In addition, each subscriber (or its wrapper structure) has to track where it is at replaying: either via an index into the list or a node into the linked list.

Since we'd like to support both modes, which only differ in the buffer management, let's declare a basic interface that captures buffer operations.


interface ReplayBuffer<T> {
    void onNext(T value);
    void onError(Throwable e);
    void onCompleted();
    void replay(ReplayProducer<T> child);
}

The interface is straightforward, it takes the various events and allows replaying to a specific child subscriber (described later).

Unbounded replay buffer

Now let's see the implementation for the unbounded replay buffer:


static final class UnboundedReplayBuffer<T> implements ReplayBuffer<T> {
    final List<Object> values = new ArrayList<>();
    volatile int size;
    final NotificationLite<T> nl = NotificationLite.instance();
    @Override
    public void onNext(T value) {
        values.add(nl.next(value));
        size++;
    }
    @Override
    public void onError(Throwable e) {
        values.add(nl.error(e));
        size++;
    }
    @Override
    public void onCompleted() {
        values.add(nl.completed());
        size++;
    }
    @Override
    public void replay(ReplayProducer<T> child) {
        if (child.wip.getAndIncrement() != 0) {
            return;
        }
        
        int missed = 1;
        
        for (;;) {
            
            // implement
            
            missed = child.wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

We simply convert each event into a notification and add it to the list. Incrementing the volatile size fields acts as a release (no need for atomic increment because the callers of the onXXX methods are serialized), therefore, observing its value means the values list can be iterated up to that point (all resize related operations have been committed). The replay method, so far, is the well known queue-drain pattern: a single thread will enter and do whatever it can to emit values. Let's see the drain part in this method:


// for (;;)

long r = child.requested.get();
boolean unbounded = r == Long.MAX_VALUE;
long e = 0;
int index = child.index;                    // (1)

while (r != 0L && index != size) {          // (2)
    if (child.isUnsubscribed()) {
        return;
    }
    
    Object v = values.get(index);           // (3)
    
    if (nl.accept(child.child, v)) {        // (4)
        return;
    }
    
    index++;
    r--;
    e--;                                    // (5)
}

if (e != 0L) {
    child.index = index;                    // (6)
    if (!unbounded) {
        child.requested.addAndGet(e);
    }
}
// missed = ...

This should also look familiar, let's see the reasoning behind certain lines:


  1. We retrieve the current child requested amount and the current child index. We remember if the request amount was Long.MAX_VALUE and have a counter for emitted values.
  2. We have to try emitting if the child can receive it and we haven't reached the end of the available values.
  3. If both requests and values are available, we get the next event by index.
  4. The NotificationLite.accept will convert the notification object into the proper onXXX call on the child Subscriber and return true if said event is a terminal event.
  5. We increment the index, decrement the remaining requested amount and decrement the emission amount. This latter may look strange but it saves us a negation when we update the child requested amount in (6).
  6. Finally, if there was any emission, we save the new index and if the child request wasn't unbounded, subtract the emitted count from the child request amount.

Bounded replay buffer

Managing a bounded replay buffer is more involved. I'm going to show a size-bound version but you should be able derive your own custom bounding logic based on it. First, we need a Node type that will hold the actual value and the link to the next Node:


static final class Node {
    final Object value;
    final long id;
    volatile Node next;
    public Node(Object value, long id) {
        this.value = value;
        this.id = id;
    }
}

The node holds the actual value, a pointer to the next node and an id field. This field will help with the request coordination later on.

Now let's see the implementation of the BoundedReplayBuffer:


static final class BoundedReplayBuffer<T>
implements ReplayBuffer<T> {
    
    final NotificationLite<T> nl = 
            NotificationLite.instance();
    
    volatile Node head;                            // (1)
    Node tail;
    int size;                                      // (2)
    final int maxSize;
    long id;
    
    
    public BoundedReplayBuffer(int maxSize) {      // (3)
        this.maxSize = maxSize;
        tail = new Node(null, 0);
        head = tail;
    }
    
    void add(Object value) {                       // (4)
        Node n = new Node(value, ++id);
        Node t = tail;
        tail = n;
        t.next = n;
    }
    
    @Override
    public void onNext(T value) {
        add(nl.next(value));
        if (size == maxSize) {                     // (5)
            Node h = head;
            head = h.next;
        } else {
            size++;
        }
    }
    
    @Override
    public void onError(Throwable e) {             // (6)
        add(nl.error(e));
    }
    
    @Override
    public void onCompleted() {
        add(nl.completed());
    }
    
    @Override
    public void replay(ReplayProducer<T> child) {  // (7)
        if (child.wip.getAndIncrement() != 0) {
            return;
        }
        
        int missed = 1;

        for (;;) {
            
            // implement
            
            missed = child.wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

This kind of buffer has to consider more state:


  1. We have to keep a pointer to the head and tail of the linked node structure. The head has to be volatile because we are going to read it when a child Subscriber subscribes to it; I call this pinning. The tail is only modified from the thread of the main source (already serialized) and is never accessed by the child Subscribers so no need for volatile there.
  2. Since we want to limit the number of values to be replayed, we have to know the current size (without walking the linked list all the time) and the maximum allowed count. In addition, we will tag each node with an unique running identifier that will come into play during request coordination.
  3. In the constructor, we create our first empty node and assign it to both head and tail. This may seem a bit odd but has its reasons: it allows appending to the end of an empty buffer, otherwise an empty buffer would have null pointers and we'd get a discontinuity. There are two small drawbacks: a) this means the start value at any given time is head.next.value, behind an indirection and b) as we will move the head pointer ahead in (5), it will retain one extra value. In other terms, a replay(5) will keep 6 objects alive. This is true for RxJava's replay() and ReplaySubject too. If one really wants to avoid retaining this extra value, you have to apply reference counting to node which itself adds overhead for every value, both when added to the buffer and when replayed. 
  4. We will add new nodes of notifications to the linked list via add. The operation is straightforward: create a new node with a new unique identifier, make it the tail and set the next field of the old tail to this new node. The order is important here because next is volatile and acts as a release operation to all changes made before.
  5. Whenever a normal value arrives, we add it to the list and see if we are already at the capacity limit. If not, we can increment the size counter freely. Otherwise, there is no need to change the size anymore as the plus 1 from the add and minus 1 from the remove operation cancels out. This remove operation is basically moving the head forward by one node: given the current head, make the new head the next pointer of the old head. Since the linked structure is guaranteed to have at least one node (due to add()), the new head won't be null and the continuity is preserved.
  6. Since the terminal events are (usually) not part of the size bound, we can simply just add their node and not care about trimming the list.
  7. Again, the outer drain loop has the well known pattern.

Now let's see the inner parts of the drain loop of (7):


// for (;;) {
long r = child.requested.get();
boolean unbounded = r == Long.MAX_VALUE;
long e = 0;
Node index = child.node;

if (index == null) {                       // (1)
    index = head;
    child.node = index;
    
    child.addTotalRequested(index.id);     // (2)
}

while (r != 0L && index.next != null) {    // (3)
    if (child.isUnsubscribed()) {
        return;
    }
    
    Object v = index.value;
    
    if (nl.accept(child.child, v)) {
        return;
    }
    
    index = index.next;                    // (4)
    r--;
    e--;
}

if (e != 0L) {
    child.node = index;
    if (unbounded) {
        child.requested.addAndGet(e);
    }
}
// missed = ...

At this point, it shouldn't come to surprise the implementation uses the same pattern as with the UnboundedReplayBuffer, but there are a few differences:


  1. Since the nodes are object references, their default is null so the first time the replay is called, we have to capture (pin) the current head of the buffer (and store it in case the requested amount is still zero).
  2. The addTotalRequested will get this first node's unique identifier. The reason will be explained in the request coordination section below.
  3. To see if we reached the end of the available values, we have to check the next field of index.
  4. If the index.next was not null, we have a value for emission and can move the current index ahead by one node.

Request coordination

So far, there shouldn't be anything overly complicated with the classes (apart from a few unexplained methods and the structure of ReplayProducer).

As stated in the previous blog post, generally there are two ways to coordinate requests: lock-stepping and max-requesting. Lock-stepping was quite suitable for the PublishConnectableObservable.

Let's think about lock-stepping in terms of the replay operation we want to implement. If we want to do unbounded buffering, requesting the minimum amount of all child subscribers doesn't really matter as we will retain all values regardless when they are requested; everyone will gets its amount replayed regardless of the others: if there is some Subscriber that can take it all, why not get the values for it?

I we want to do bounded buffering, child Subscribers may come and go at different times, which means the current identifier inside BoundedReplaySubject is different for each one and each Subscriber will essentially request values relative to this identifier. Here, there is no clear definition of minimum request because the request of 5 in an earlier Subscriber and a request of 2 in later Subscriber that arrives after the 2nd source value can't be meaningfully compared.

Based on this reasoning, what we will do is implementing the request coordination to request the maximum amount that any child requests at any time and let the queue-drain deal with the emission.

However, we still have the problem of non-comparable request amounts due to potential time differences. This is where the unique identifier and another structure comes into play: keeping track the total requested (along with the relative requested). Whenever a child Subscriber requests, we will add this request amount to that particular Subscriber's totalRequested amount (ReplayPublisher.totalRequested) and see if this amount is bigger than the total requested amount we are sending to the main source. If bigger, we request only the difference from upstream.

The unique identifier helps with latecomers in our total-requested scheme. Without it, a latecomer's total requested amount would be too low and not trigger upstream requested in certain situations. For example, let's assume we have a child Subscriber on a range(1, 10).replay(1) that requests 2 elements and gets it. Then a new subscriber comes in and requests 2 as well. Clearly, it should receive 2 values (2, 3), but since its total requested amount is just 2, the replay operator won't request the extra value. The solution is the indexing of values and when the current Node is first captured, use the index as the total requested amount for the child as if the child was there from the beginning but ignored values up to that point.

Note: this property of was just recently discovered as as such, RxJava didn't work correctly. The PR #3454 fixes this for the 1.x series and I'll post a PR for 2.x later.

To make this more clear, let's see the implementation of the ReplayProducer.

static final class ReplayProducer<T> 
implements Producer, Subscription {
    int index;
    Node node;                                    // (1)
    final Subscriber<? super T> child;
    final AtomicLong requested;
    final AtomicInteger wip;
    final AtomicLong totalRequested;
    final AtomicBoolean once;                     // (2)

    Connection<T> connection;

    public ReplayProducer(
            Subscriber<? super T> child) {
        this.child = child;
        this.requested = new AtomicLong();
        this.totalRequested = new AtomicLong();
        this.wip = new AtomicInteger();
        this.once = new AtomicBoolean();
    }

    @Override
    public void request(long n) {
        if (n > 0) {
            BackpressureUtils
            .getAndAddRequest(requested, n);
            BackpressureUtils
            .getAndAddRequest(totalRequested, n); // (3)

            connection.manageRequests();          // (4)
        }
    }

    @Override
    public boolean isUnsubscribed() {
        return once.get();
    }

    @Override
    public void unsubscribe() {
        if (once.compareAndSet(false, true)) {
            connection.remove(this);             // (5)
        }
    }

    void addTotalRequested(long n) {             // (6)
        if (n > 0) {
            BackpressureUtils
            .getAndAddRequest(totalRequested, n);
        }
    }
}


Its purpose is to be set on a child Subscriber and mediate the request and unsubscription requests for it:

  1. We want to use the same class for both the bounded and unbounded buffer mode so we have to store the current index/node in fields.
  2. We have the usual set of fields: the child Subscriber, the wip counter for the queue-drain serialization, the current requested amount and an AtomicBoolean field indicating an unsubscribed state. In addition we will track the total requested amount and will coordinate requesting from upstream with the help of it.
  3. Whenever the child requests, we update both the relative requested amount and the total requested amount with the common BackpressureUtils helper that will cap the amounts at Long.MAX_VALUE if necessary.
  4. Once set, we have to trigger a request management to determine if the upstream needs to be requested or not.
  5. When the child unsubscribes, we need to remove this ReplayProducer from the array of tracked ReplayProducers.
  6. Finally, the bounded buffer's replay requires to update the total requested amount before emission so the request coordination works with latecomers as well.

Before looking at the manageRequests() call, I have to show the skeleton of the Connection class (the equivalent class from PublishConnectableObservable):


@SuppressWarnings({ "unchecked", "rawtypes" })
static final class Connection<T> implements Observer<T> {

    final AtomicReference<ReplayProducer<T>[]> subscribers;
    final State<T> state;
    final AtomicBoolean connected;
    final AtomicInteger wip;

    final SourceSubscriber parent;

    final ReplayBuffer<T> buffer;                        // (1)

    static final ReplayProducer[] EMPTY = 
        new ReplayProducer[0];

    static final ReplayProducer[] TERMINATED = 
        new ReplayProducer[0];
    
    long maxChildRequested;                              // (2)
    long maxUpstreamRequested;

    public Connection(State<T> state, int maxSize) {
        this.state = state;
        this.wip = new AtomicInteger();
        this.subscribers = new AtomicReference<>(EMPTY);
        this.connected = new AtomicBoolean();
        this.parent = createParent();
        
        ReplayBuffer b;                                 // (3)
        if (maxSize == Integer.MAX_VALUE) {
            b = new UnboundedReplayBuffer<>();
        } else {
            b = new BoundedReplayBuffer<>(maxSize);
        }
        this.buffer = b;
    }

    SourceSubscriber createParent() {                   // (4)
        SourceSubscriber parent = 
            new SourceSubscriber<>(this);

        parent.add(Subscriptions.create(() -> {
            switch (state.strategy) {
            case SEND_COMPLETED:
                onCompleted();
                break;
            case SEND_ERROR:
                onError(new CancellationException(
                    "Disconnected"));
                break;
            default:
                parent.unsubscribe();
                subscribers.getAndSet(TERMINATED);
            }
        }));

        return parent; 
    }

    boolean add(ReplayProducer<T> producer) {
        // omitted
    }

    void remove(ReplayProducer<T> producer) {
        // omitted 
    }

    void onConnect(
    Action1<? super Subscription> disconnect) {
        // omitted
    }

    @Override
    public void onNext(T t) {                          // (5)
        ReplayBuffer<T> buffer = this.buffer;
        buffer.onNext(t);
        ReplayProducer<T>[] a = subscribers
            .get();
        for (ReplayProducer<T> rp : a) {
            buffer.replay(rp);
        }
    }

    @Override
    public void onError(Throwable e) {
        ReplayBuffer<T> buffer = this.buffer;
        buffer.onError(e);
        ReplayProducer<T>[] a = subscribers
            .getAndSet(TERMINATED);
        for (ReplayProducer<T> rp : a) {
            buffer.replay(rp);
        }
    }

    @Override
    public void onCompleted() {
        ReplayBuffer<T> buffer = this.buffer;
        buffer.onCompleted();
        ReplayProducer<T>[] a = subscribers
            .getAndSet(TERMINATED);
        for (ReplayProducer<T> rp : a) {
            buffer.replay(rp);
        }
    }

    void manageRequests() {                           // (6)
        if (wip.getAndIncrement() != 0) {
            return;
        }
        
        int missed = 1;
        
        for (;;) {

            // implement            
            
            missed = wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

The class looks quite the same as PublishConnectableObservable.Connect, therefore, I've omitted the methods that are exactly the same. Let's see the rest:


  1. Instead of a bounded queue, we now have the common ReplayBuffer interface.
  2. We have to keep track the maximum values of both child requests and requests issued to upstream. The latter is necessary because we can't know when the upstream's Producer arrives and we have to accumulate the coordinated request amount until it arrives.
  3. I treat Integer.MAX_VALUE as the indicator for the unbounded replay mode.
  4. The createParent is slightly changed. Instead of the disconnected flag, we now unsubscribe directly from upstream. The implementations add, remove and onConnect are the same as in the last post.
  5. The onXXX methods have the same pattern: call the appropriate method on the buffer instance and then call replay for all known ReplayProducer instances. Note that the terminal events also swap in the TERMINATED array atomically, indicating that subsequent Subscribers have to go to the next Connection object.
  6. Last but not least, we have to manage requests from all child Subscribers which may call the method concurrently and thus we have to do some serialization. Since we are going to calculate the maximum to request, the non-blocking serialization approach works here quite well. This method is called when the upstream producer finally arrives and when any child subscriber requests something.

Now let's dive into the request coordination logic.


// for (;;) {

ReplayProducer<T>[] a = subscribers.get();

if (a == TERMINATED) {
    return;
}

long ri = maxChildRequested;
long maxTotalRequests = ri;                 // (1)

for (ReplayProducer<T> rp : a) {
    maxTotalRequests = Math.max(
        maxTotalRequests, 
        rp.totalRequested.get());
}

long ur = maxUpstreamRequested;
Producer p = parent.producer;

long diff = maxTotalRequests - ri;          // (2)
if (diff != 0) {
    maxChildRequested = maxTotalRequests;
    if (p != null) {                        // (3)
        if (ur != 0L) {
            maxUpstreamRequested = 0L;
            p.request(ur + diff);           // (4)
        } else {
            p.request(diff);
        }
    } else {
        long u = ur + diff;
        if (u < 0) {
            u = Long.MAX_VALUE;
        }
        maxUpstreamRequested = u;           // (5)
    }
} else
if (ur != 0L && p != null) {                // (6)
    maxUpstreamRequested = 0L;
    p.request(ur);
}

// missed = ...

Let's see how it works:


  1. After retrieving the current array of Subscribers and checking for the disconnected/terminated state, we compute the maximum of the total requested amount of each subscriber (and the previously known maximum).
  2. We calculate the difference from the last known maximum. If the difference is non zero, we remember the new maximum in maxChildRequested.
  3. At this point, the upstream Producer may be still missing. 
  4. If the producer is already there, we take any missed amount and the current difference and request it.
  5. Otherwise, without a producer, all we can do is to accumulate all the missed differences.
  6. If the maximum didn't change, we still might have to request all missed amounts if the Producer is there. As with (4), we have to "forget" all the missed values thus the next time the requests have to be coordinated, the upstream will only receive the non-zero difference then on.

In other terms, we collect how far each child subscriber wants to go and request from the upstream based on it.

As you may have noticed, this request coordination and the call to it can become quite expensive if there are lots of child Subscribers requesting left and right. In fact, we'd only have to deal with a limited set of requesters at a time and not with everyone. To solve the performance impact, we have to introduce a well known pattern: an emitter-loop or queue-drain that plays with the same serialization logic but the method now receives a parameter indicating who wants to update the coordinated request amount. This way, when a child requests some value and not others, only one child is evaluated instead of all.

There is, however, one thing to prepare for: the arrival of the upstream Producer in which case we still have to check all children. For this, we need to extend the Connection object with some extra fields:

List<ReplayProducer<T>> coordinationQueue;
boolean coordinateAll;
boolean emitting;
boolean missed;

You might have guessed what approach this will take: emitter loop. We can drop the wip counter and replace it with emitting/missed.


void manageRequests(ReplayProducer<T> inner) {
    synchronized (this) {                               // (1)
        if (emitting) {
            if (inner != null) {
                List<ReplayProducer<T>> q = 
                    coordinationQueue;
                if (q == null) {
                    q = new ArrayList<>();
                    coordinationQueue = q;
                }
                q.add(inner);
            } else {
                coordinateAll = true;
            }
            missed = true;
            return;
        }
        emitting = true;
    }
    
    long ri = maxChildRequested;
    long maxTotalRequested;
    
    if (inner != null) {                                // (2)
        maxTotalRequested = Math.max(
            ri, inner.totalRequested.get());
    } else {
        maxTotalRequested = ri;

        @SuppressWarnings("unchecked")
        ReplayProducer<T>[] a = producers.get();
        for (ReplayProducer<T> rp : a) {
            maxTotalRequested = Math.max(
                maxTotalRequested, rp.totalRequested.get());
        }
        
    }
    makeRequest(maxTotalRequested, ri);
    
    for (;;) {
        if (isUnsubscribed()) {
            return;
        }
        
        List<ReplayProducer<T>> q;
        boolean all;
        synchronized (this) {                           // (3)
            if (!missed) {
                emitting = false;
                return;
            }
            missed = false;
            q = coordinationQueue;
            coordinationQueue = null;
            all = coordinateAll;
            coordinateAll = false;
        }
        
        ri = maxChildRequested;                         // (4)
        maxTotalRequested = ri;

        if (q != null) {
            for (ReplayProducer<T> rp : q) {
                maxTotalRequested = Math.max(
                maxTotalRequested, rp.totalRequested.get());
            }
        } 
        
        if (all) {
            @SuppressWarnings("unchecked")
            ReplayProducer<T>[] a = producers.get();
            for (ReplayProducer<T> rp : a) {
                maxTotalRequested = Math.max(
                maxTotalRequested, rp.totalRequested.get());
            }
        }
        
        makeRequest(maxTotalRequested, ri);
    }
}

It works as follows:


  1. First, we try to enter the emission loop. If it fails and the parameter to the method was null, we set the coordinateAll flag which will trigger a full sweep. Otherwise, we queue up the ReplayProducer and quit.
  2. If the current thread managed to get into the emission state, we either determine the maximum requested by using the single ReplayProducer the method was called with or do a full sweep if it was actually null.
  3. Next comes the loop part of the emitter-loop approach. We check if we missed some calls and get all the queued up ReplayProducers as well as the indicator for a full sweep.
  4. Given all previous inputs we sweep the queued up ReplayProducers for the maximum value and if necessary, all the other known ReplayProducers as well. Note that they both may have to run since the queue may have ReplayProducers not known at the time this method runs and vice versa.
Finally, the upstream requesting can be factored out into a common method:

void makeRequest(long maxTotalRequests, long previousTotalRequests) {
    long ur = maxUpstreamRequested;
    Producer p = producer;

    long diff = maxTotalRequests - previousTotalRequests;
    if (diff != 0) {
        maxChildRequested = maxTotalRequests;
        if (p != null) {
            if (ur != 0L) {
                maxUpstreamRequested = 0L;
                p.request(ur + diff);
            } else {
                p.request(diff);
            }
        } else {
            long u = ur + diff;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            maxUpstreamRequested = u;
        }
    } else
    if (ur != 0L && p != null) {
        maxUpstreamRequested = 0L;
        // fire the accumulated requests
        p.request(ur);
    }
}

which is practically the same as with the original sweep-all manageRequests() method is.

ReplayConnectableObservable

All what's remaining in this post to show the remaining SourceSubscriber class, the ReplayConnectableObservable itself.

Since we need the producer from upstream, we use the SourceSubscriber to store it for us and get it once ready. Note that we can't use Subscriber.request() here for two reasons: a) the call to request() don't accumulate until a Producer arrives and b) we can't know if there has a Producer arrived or not.


static final class SourceSubscriber<T> 
extends Subscriber<T> {
    final Connection<T> connection;
    
    volatile Producer producer;
    
    public SourceSubscriber(Connection<T> connection) {
        this.connection = connection;
    }

    @Override
    public void onNext(T t) {
        connection.onNext(t);
    }

    @Override
    public void onError(Throwable e) {
        connection.onError(e);
    }

    @Override
    public void onCompleted() {
        connection.onCompleted();
    }

    @Override
    public void setProducer(Producer p) {
        producer = p;
        connection.manageRequests();
    }
}

Nothing outstanding: we delegate everything to the Connection instance. Note the connection.manageRquests() call which will trigger the request coordination to actually request the amount held in the maxUpstreamRequested field (i.e., the missed requests). If we have the more performant version, the call is manageRequests(null) instead.

The State class is also changed a bit due to the indication of bounded buffering and due to the need to start replaying to a new Subscriber once it successfully subscribed to the current connection.


static final class State<T> implements OnSubscribe<T> {
    final DisconnectStrategy strategy;
    final Observable<T> source;
    final int maxSize;                                     // (1)

    final AtomicReference<Connection<T>> connection;

    public State(DisconnectStrategy strategy, 
    Observable<T> source, int maxSize) {
        this.strategy = strategy;
        this.source = source;
        this.maxSize = maxSize;
        this.connection = new AtomicReference<>(
            new Connection<>(this, maxSize));
    }

    @Override
    public void call(Subscriber<? super T> s) {
        // implement
        ReplayProducer<T> pp = new ReplayProducer<>(s);

        for (;;) {
            Connection<T> curr = this.connection.get();

            pp.connection = curr;

            if (curr.add(pp)) {
                if (pp.isUnsubscribed()) {
                    curr.remove(pp);
                } else {
                    curr.buffer.replay(pp);               // (2)

                    s.add(pp);
                    s.setProducer(pp);
                }
                
                break;
            }
        }
    }

    public void connect(
    Action1<? super Subscription> disconnect) {
        // same as before
    }

    public void replaceConnection(Connection<T> conn) {   // (3)
        Connection<T> next = 
            new Connection<>(this, maxSize);
        connection.compareAndSet(conn, next);
    }
}

There are some changes:

  1. We have to store the maxSize parameter because a reconnection has to recreate the appropriate ReplayBuffer instance as well.
  2. Once we create an ReplayProducer, first we try to add it to the current connection. If successful, then we do a barebone replay call. Since the ReplayProducer has requested value of zero, this won't replay any value to the child Subscriber. What it does is that it captures the current head of the buffer's linked list (if the buffer is bounded), pins it and makes sure this ReplayProducer starts with the correct total requested amount. Only after this setup is the ReplayProducer added to the child as an unsubscription and request target.
  3. Note that the Connection now requires a maxSize parameter.

Note that this order in (2) does work only because I've shown an implementation of the replay that replays terminal events only when requested which is not a necessary requirement or expectation for terminal events, although should not cause any real world problems as most Subscribers just keep requesting.

Finally, we still need factory methods to create instances of ReplayConnectableObservable:


public static <T> ReplayConnectableObservable<T> createUnbounded(
        Observable<T> source, 
        DisconnectStrategy strategy) {
    return createBounded(source, strategy, Integer.MAX_VALUE);
}

public static <T> ReplayConnectableObservable<T> createBounded(
        Observable<T> source, 
        DisconnectStrategy strategy, int maxSize) {
    State<T> state = new State<>(strategy, source, maxSize);
    return new ReplayConnectableObservable<>(state);
}


Conclusion

In this blog post, I've detailed the inner workings of a replay-like ConnectableObservable that can do both bounded and unbounded replays. The complexity is one level up relative to the PublishConnectableObservable from the last part; if you understood that then this shouldn't come as a too large leap. The added complexity comes from the management of the buffer and the coordination of requests with the max strategy.

In the next part, I'm going to talk a bit about how to turn such ConnectableObservables into Subjects that now will perform request coordination which may become mandatory for RxJava 2.0 Subjects and Reactive-Streams Processors, depending on how a certain discussion will be resolved.

Nincsenek megjegyzések:

Megjegyzés küldése