2015. október 28., szerda

Comparison of Reactive-Streams implementations (part 1)

Introduction

The Reactive-Streams initiative becomes more and more known in concurrency/parallelism circles and there appear to be several implementations of the specification, most notably Akka-Streams, Project Reactor and RxJava 2.0.

In this blog post, I'm going to look at how one can use each library to build up a couple of simple flow of values and while I'm at it, benchmark them with JMH. For comparison and sanity checking, I'll also include the results of RxJava 1.0.14, Java and j.u.c.Stream.

In this part, I'm going to compare the synchronous behavior of the 4 libraries through the following tasks:

  1. Observe a range of integers from 1 to (1, 1000, 1.000.000) directly.
  2. Apply flatMap to the range of integers (1) and transform each value into a single value sequence.
  3. Apply flatMap to the range of integers (1) and transform each value into a range of two elements.
The runtime environment:
  • Gradle 2.8
  • JMH 1.11.1
    • Threads: 1
    • Forks: 1
    • Mode: Throughput
    • Unit: ops/s
    • Warmup: 5, 1s each
    • Iterations: 5, 2s each
  • i7 4790 @ 3.5GHz stock settings CPU
  • 16GB DDR3 @ 1600MHz stock RAM
  • Windows 7 x64
  • Java 8 update 66 x64

RxJava

Let's start with the implementation of the tasks in RxJava. First, one has to include the library within the build.gradle file. For RxJava 1.x:

compile 'io.reactivex:rxjava:1.0.14'

For RxJava 2.x:

repositories {
    mavenCentral()

    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

compile 'io.reactivex:rxjava:2.0.0-DP0-SNAPSHOT'

Unfortunately, one can't really have multiple versions of the same ArtifactID so either we swap the compile ref or switch to my RxJava 2.x backport, which is under a different name and different package naming:

compile 'com.github.akarnokd:rxjava2-backport:2.0.0-RC1'

Once the libs are set up, let's see the flows:


@Params({"1", "1000", "1000000"})
int times;
//...

Observable<Integer> range = Observable.range(1, times);

Observable<Integer> rangeFlatMapJust = range
    .flatMap(Observable::just);

Observable<Integer> rangeFlatMapRange = range
    .flatMap(v -> Observable.range(v, 2));

The code looks the same for both versions, only the imports have to be changed. Nothing complicated.

Observation of the streams will generally be performed via the LatchedObserver instance which extends/implements Observer and will be reused for the other libraries as well:


public class LatchedObserver<T> extends Observer<T> {
    public CountDownLatch latch = new CountDownLatch(1);
    private final Blackhole bh;
    public LatchedRSObserver(Blackhole bh) {
        this.bh = bh;
    }
    @Override
    public void onComplete() {
        latch.countDown();
    }
    @Override
    public void onError(Throwable e) {
        latch.countDown();
    }
    @Override
    public void onNext(T t) {
        bh.consume(t);
    }
}

Since these flows are synchronous, we won't utilize the latch itself but simply subscribe to the flows:


@Benchmark
public void range(Blackhole bh) {
    range.subscribe(new LatchedObserver<Integer>(bh));
}

Let's run it for both 1.x and 2.x and see the benchmark results:


This is a screenshot of my JMH comparison tool; it can display colored comparison of throughput values: green is better than the baseline, red is worse. Lighter color means at least +/- 3%, stronger color means +/- 10% difference.

Here and all the subsequent images, a larger number is better. You may want to multiply the times with the measured value to get the number of events transmitted. Here, Range with times = 1000000 means that there were ~253 million numbers emitted.

It appears RxJava 2.x can do quite the numbers better, except in the two RangeFlatMapJust cases. What's going on? Let me explain.

The improvements come from the fact that RxJava 2.x has generally less subscribe() overhead than 1.x. In 1.x when one creates a Subscriber, it will be wrapped into a SafeSubscribe instance and when the Producer is set on it, there is a small arbitration happening inside setProducer(). As far as I can tell, the JIT in 1.x will do its best to remove the allocation and the synchronization, but the arbitration won't be removed which means more instructions for the CPU to execute. In contrast, in 2.x there is no wrapping and no arbitration at all.

The lower performance of the RangeFlatMapJust comes from a single operator: just(). In 1.x, the operator just() immediately emits its value without bothering with Producers and requests which means it doesn't support or respect backpressure. In 2.x, however, just() has to consider backpressure requests which involves a mandatory atomic CAS (~10 ns or 35 cycles uncontended (!)). This is the cost of correctness.

Edit: (wrong explanation before)

The lower performance comes from the serialization approaches the two versions use: 1.x uses the synchronized-based emitter-loop and 2.x uses the atomics-based queue-drain approach. The former is elided by the JIT whereas the latter can't be and there is always a ~17 ns overhead per value. I'm planning a performance overhaul for 2.x anyways so this won't remain the case for too long.

In conclusion, I think RxJava does a good job both in terms of usability and performance. Why am I mentioning usability? Read on.


Project Reactor

Project Reactor is another library that supports the Reactive-Streams specification and provides a similar fluent API as RxJava.

I've briefly benchmarked one of its earlier version (2.0.5-RELEASE) and posted a picture of it, but I'm going to use the latest snapshot of it. For this, we need to adjust our build.gradle file.

repositories {
    mavenCentral()

    maven { url 'http://repo.spring.io/libs-snapshot' }
}

compile 'io.projectreactor:reactor-stream:2.1.0.BUILD-SNAPSHOT'
This should make sure I'm using a version with the most performance enhancements possible.

The source code for the flows look quite similar:


Stream<Integer> range = Streams.range(1, times);

Stream<Integer> rangeFlatMapJust = raRange.flatMap(Streams::just);

Stream<Integer> rangeFlatMapRange = raRange
    .flatMap(v -> Streams.range(v, 2));

A small note on the Streams.range() here. It appears the API has changed between 2.0.5 and the snapshot. In 2.0.5, the operator's parameters were start+end (both inclusive) which is now changed to start+count thus matches RxJava's range().

The same LatchedObserver can be used here so let's see the run results:



Here, reactor2 stands for the 2.1.0 snapshot and reactor1 is 2.0.5 release. Clearly, Reactor has improved its performance by reducing the overhead in the operators (by a factor of ~10).

There is, however a curious result with RangeFlatMapJust, similar to RxJava: both RxJava 1.x and Reactor 2.1.0 outperform RxJava 2.x and with roughly the same amount! What's happening there?

I know that flatMap in RxJava 1.x is faster in single-threaded use because it uses the emitter-loop approach (which utilizes synchronized) which can be nicely elided by the JIT compiler and thus the overhead is removed. In 2.x, the code, currently, uses queue-drain with 2 unavoidable a atomic operations per value on the fast path.

So let's find out what Reactor does. Its flatMap is implemented in the FlatMapOperator class and what do I see? It's almost the same as RxJava 2.x flatMap! Even the bugs are the same!

Just kidding about the bugs. There are a few differences so let's check the same fast-path and why it can do 4-8 million values more.

The doNext() looks functionally identical: if the source is a Supplier, it gets the held value directly without subscription then tries to emit it via tryEmit().

Potential bug: If this path crashes and goes into reportError(), the execution falls through and the Publisher gets subscribed to.

Potential bug: In RxJava 2.0, we always wrap user-supplied functions into try-catches so an exception from them is handled in-place. In Reactor's implementation, this is missing from doNext (but may be present somewhere else up in the call chain).

The tryEmit() is almost the same as well with a crucial difference: it batches up requests instead of requesting one-by-one. Interesting!


if (maxConcurrency != Integer.MAX_VALUE && !cancelled
       && ++lastRequest == limit) {
    lastRequest = 0;
    subscription.request(limit);
}

The same re-batching happens with the inner subscribers in both implementations (although this doesn't come into play in the given flow example). Nice work Project Reactor!

In the RangeFlatMapRange case, which doesn't exercise this fast path, Reactor is slower although it uses the same flatMap logic. The answer is a few lines above in the results: Reactor's range produces 100 million values less per second.

Following the references along, there are a bunch of wrappers and generalizations, but those only apply once per Subscriber so they can't be the cause for the times = 1000000 case.

The reason appears to be that range() is implemented like RxJava 2.x's generator (i.e., SyncOnSubscribe). The ForEachBiConsumer looks tidy enough but I can spot a few potential deficiencies:


  • Atomic read and increment is involved which forces the JIT'd code to re-read the instance fields from cache instead of keeping it in a register. The requestConsumer could be read into a local variable before the loop.
  • Use == or != as much as possible because the other kind of comparisons appear to be slower on x86.
  • The atomic decrement is an expensive operation (~10ns) but can be delayed quite a bit: once the current known requested amount runs out, one should try to read the requested amount first to see if there were more requests issued in the mean time. If so, keep emitting, otherwise subtract all that has been emitted from the request count.
RxJava's range doesn't do this latter at the moment; HotSpot's register allocator seems to be hectic at times: too many local variables and performance drops because of register spill (on x64!). Implementing this latter optimization involves more local variables and thus the risk of making things worse.

In conclusion, Project Reactor gets better and better with each release, especially when it adopts RxJava 2.x structures and algorithms ;)

Akka-Streams

I believe Akka-Streams was the most advertised library from the list. With a company behind it and a port from Scala, what could go wrong?

So let's include it in the build.gradle:

compile 'com.typesafe.akka:akka-stream-experimental_2.11:1.0'

So far so good, but where do I start? Looking at the web I came across a ton of examples, in Scala. Unfortunately, I don't know Scala enough so it was difficult for me to figure out what to use. Plus, it doesn't help that with Eclipse, the source code of the library is hard to navigate because it's in Scala (and I don't want to install the plugin). Okay, we won't look at the source code.

It turns out, Akka-Streams doesn't have a range operator, therefore, I have prepopulate a List with the values and use it as a source:

List<Integer> values = rx2Range
    .toList().toBlocking().first();

Source.from(values).???

A good thing RxJava is around. Akka-Stream uses the Source object as factory method for creating sources. However, Source does not implement Publisher at all!

One does not simply observe a Source.

After digging a bit, I found an example which shows one has to use runWith that takes a Sink.publisher() parameter. Let's apply them:


Publisher<Integer> range = Source
    .from(values).runWith(Sink.publisher());

Doesn't work; the example was out of date and one needs a Materializer in runWith. Looking at the hierarchy, ActorMaterializer does implement it so let's get one.


ActorMaterializer materializer = ActorMaterializer
    .create(???);

Publisher<Integer> range = Source.from(values)
    .runWith(Sink.publisher(), materializer);

Hmm, it requires an ActorRefFactory. But hey, I remember the examples creating an ActorSystem, so let's do that.


ActorSystem actorSystem = ActorSystem.create("sys");

ActorMaterializer materializer = ActorMaterializer
    .create(actorSystem);

Publisher<Integer> range = Source.from(values)
    .runWith(Sink.publisher(), materializer);

Finally, no more dependencies. Let's run it!

Doesn't work, crashes with missing configuration for akka.stream. Huh? After spending some time figuring out things, it appears Akka defaults to a reference.conf file in the classpath's root. But both jars of the library have this reference.conf!

As it turns out, when the Gradle-JMH plugin packages up the benchmark jar, it puts both reference.conf files into the jar and both of them end up in there under the same name; Akka then picks up the wrong one.

The solution: pull the one from the streams jar out and put it under a different name into the Gradle sources/resources.

Sidenote: this is still not enough as by default Gradle ignores non java files, especially if they are not under src/main/java. I had to add the following code to build.gradle to make it work:

processResources {
  from ('src/main/java') {
    include '**/*.conf'
  }
}

With all these set up, lets finish the preparation:


Config cfg = ConfigFactory.parseResources(
     ReactiveStreamsImpls.class, "/akka-streams.conf");
ActorSystem actorSystem = ActorSystem.create("sys", cfg);

ActorMaterializer materializer = ActorMaterializer
    .create(actorSystem);
        
List<Integer> values = rx2Range
    .toList().toBlocking().first();
        
Publisher<Integer> range = Source.from(values)
            .runWith(Sink.publisher(), materializer);


Compiles? Yes! Benchmark jar contains everything? Yes! The setup runs? Yes! Benchmark method works? No?!

After one iteration, it throws an error because the range Publisher can't be subscribed to more than once. I've asked for solutions on StackOverflow to no avail; whatever I've got back either didn't compile or didn't run. At this point, I just gave up on it and used a trick to make it work multiple times: defer(). I have to defer the creation of the whole Publisher so I get something fresh every time:


Publisher<Integer> range = s -> Source.from(values)
            .runWith(Sink.publisher(), materializer).subscribe(s);


In addition, as I suspected, there is no way to run Akka-Streams synchronously, therefore, any benchmark with the other synchronous guys can't be directly compared. Plus, I have to use the CountDownLatch to await the termination:


@Benchmark
public void akRange(Blackhole bh) throws InterruptedException {
    LatchedObserver<Integer> lo = new LatchedObserver<>(bh);
    akRange.subscribe(lo);
    
    if (times == 1) {
        while (lo.latch.getCount() != 0);
    } else {
        lo.latch.await();
    }
}

Note: I have to use a spin-loop over the latch for times == 1 because Windows' timer resolution and wakeup takes pretty long milliseconds to happen at times and without spinning, the benchmark produces 35% lower throughput.

Almost ready, we still need the RangeFlatMapJust and RangeFlatMapRange equivalents. Unfortunately, Akka-Streams doesn't have flatMap but has a flatten method on Source. No problem (by now):


Publisher<Integer> rangeFlatMapJust = s -> 
                Source.from(values)
                .map(v -> Source.single(v))
                .flatten(FlattenStrategy.merge())
                .runWith(Sink.publisher(), materializer)
                .subscribe(s)
                ;

Nope. Doesn't work because there is no FlattenStrategy.merge(), despite all the examples. But there is a FlattenStrategy.concat(). Have to do.

Nope, still doesn't compile because of type inference problems. Have to introduce a local variable:

FlattenStrategy<Source<Integer, BoxedUnit>> flatten = 
    FlattenStrategy.concat();

Works in Eclipse, javac fails with ambiguity error. As it turns out, javadsl.FlattenStrategy extends scaladsl.FlattenStrategy which both have the same concat() factory method but different number of type arguments. This isn't the first time javac can't disambiguate but Eclipse can!

We don't give up and use reflection to get the proper method called:


Method m = akka.stream.javadsl.FlattenStrategy
    .class.getMethod("concat");

@SuppressWarnings({ "rawtypes", "unchecked" })
FlattenStrategy<Source<Integer, BoxedUnit>, Integer> flatten = 
    (FlattenStrategy)m.invoke(null);

Publisher<Integer> rangeFlatMapJust = s -> 
                Source.from(values)
                .map(v -> Source.single(v))
                .flatten(flatten)
                .runWith(Sink.publisher(), materializer)
                .subscribe(s)
                ;

Finally, Akka-Streams works. Let's see the benchmark results:



Remember, since Akka can't run synchronously and we had to do a bunch of workarounds, we should expect numbers will be lower by a factor of 5-10.

I don't know what's going on here. Some numbers are 100x lower. Akka certainly doesn't throw an Exception somewhere because we'd see 5M ops/s in those cases, regardless of times.

In conclusion, I'm disappointed with Akka-Streams; it takes quite a hassle to get a simple sequence running and apparently requires more thought to a reasonable performance.

Plain Java and j.u.c.Stream

Just for reference, let's see how the same task looks and works with plain Java for loops and j.u.c.Streams.

For plain Java, the benchmarks look simple:


@Benchmark
public void javaRange(Blackhole bh) {
    int n = times;
    for (int i = 0; i < n; i++) {
        bh.consume(i);
    }
}

@Benchmark
public void javaRangeFlatMapJust(Blackhole bh) {
    int n = times;
    for (int i = 0; i < n; i++) {
        for (int j = i; j < i + 1; j++) {
            bh.consume(j);
        }
    }
}

@Benchmark
public void javaRangeFlatMapRange(Blackhole bh) {
    int n = times;
    for (int i = 0; i < n; i++) {
        for (int j = i; j < i + 2; j++) {
            bh.consume(j);
        }
    }
}

The Stream implementation is a bit complicated because a j.u.c.Stream is not reusable and has to be recreated every time one wants to consume it:


@Benchmark
public void streamRange(Blackhole bh) {
    values.stream().forEach(bh::consume);
}

@Benchmark
public void streamRangeFlatMapJust(Blackhole bh) {
    values.stream()
    .flatMap(v -> Collections.singletonList(v).stream())
    .forEach(bh::consume);
}

@Benchmark
public void streamRangeFlatMapRange(Blackhole bh) {
    values.stream()
    .flatMap(v -> Arrays.asList(v, v + 1).stream())
    .forEach(bh::consume);
}

Finally, just for fun, let's do a parallel version of the stream benchmarks:


@Benchmark
public void pstreamRange(Blackhole bh) {
    values.parallelStream().forEach(bh::consume);
}

@Benchmark
public void pstreamRangeFlatMapJust(Blackhole bh) {
    values.parallelStream()
    .flatMap(v -> Collections.singletonList(v).stream())
    .forEach(bh::consume);
}

@Benchmark
public void pstreamRangeFlatMapRange(Blackhole bh) {
    values.parallelStream()
    .flatMap(v -> Arrays.asList(v, v + 1).stream())
    .forEach(bh::consume);
}

Great! Let's see the results:



Impressive, except for some parallel cases where the forEach synchronizes all parallel operations back to a single thread I presume, negating all benefits.

In conclusion, if you have a synchronous task, try plain Java first.

Conclusion


In this blog post, I've compared the three Reactive-Streams library for usability and performance in case of a synchronous flow. Both RxJava and Reactor did quite well, relative to Java, but Akka-Streams was quite complicated to set up and didn't perform adequate "out of box".



However, there might be some remedy for Akka-Streams in the next part where I compare the libraries in asynchronous mode.


2015. október 27., kedd

ConnectableObservables (part 3)

Introduction

In the previous post, we saw how to build a ConnectableObservable which publishes events to its child Subscribers when all of them requested some amount, making them go in a lockstep.

In this blog post, I'm going to detail how one can build a replay-like ConnectableObservable: i.e., ReplayConnectableObservable. The internal structure is very similar to the PublishConnectableObservable but the request coordination is going to be more complicated.

Replay bounded or unbounded

When one wants to create a replay-like operator (or Subject), the decision has to be made whether or not do bounded or unbounded replays. Unbounded replay means that from the time of the connect(), every value is essentially cached/buffered and every subscriber will receive values from the very beginning.

Bounded replay means that the cache will start losing data as time and values go by so a late subscriber will "skip" these early values and only get the newer ones.

However, the data structures supporting these modes are quite different. The unbounded buffer can be any list-like data structure, such as j.u.List or a hybrid linked-array list (to avoid copying when the list grows). The bounded buffer is going to be a linked-list like structure, but j.u.LinkedList can't work here; we need access to the individual nodes.

The reason is twofold: 1) we need a way to tell the "current start" of the buffer as time goes and 2) we have to deal with child Subscribers who lag behind with requests and can't let them miss in between values.

The right data structure is a singly-linked list where nodes hold the actual value. Then we have to keep reference to the head and tail of the list. The head indicates where the replay will start for newcommers and the tail indicates where to append new nodes containing values from the main source.

This structure has two implications: 1) due to the singly linked nature, if the head of the list is no longer referenced by the head or by any child Subscriber, it can be "automatically" garbage collected and 2) if we pin the head pointer and never move it, we get an unbounded replay buffer (although with more overhead due to pointer chasing).

For unbounded buffers, both head and tail are integers, head is zero and tail is the number of available values.

In addition, each subscriber (or its wrapper structure) has to track where it is at replaying: either via an index into the list or a node into the linked list.

Since we'd like to support both modes, which only differ in the buffer management, let's declare a basic interface that captures buffer operations.


interface ReplayBuffer<T> {
    void onNext(T value);
    void onError(Throwable e);
    void onCompleted();
    void replay(ReplayProducer<T> child);
}

The interface is straightforward, it takes the various events and allows replaying to a specific child subscriber (described later).

Unbounded replay buffer

Now let's see the implementation for the unbounded replay buffer:


static final class UnboundedReplayBuffer<T> implements ReplayBuffer<T> {
    final List<Object> values = new ArrayList<>();
    volatile int size;
    final NotificationLite<T> nl = NotificationLite.instance();
    @Override
    public void onNext(T value) {
        values.add(nl.next(value));
        size++;
    }
    @Override
    public void onError(Throwable e) {
        values.add(nl.error(e));
        size++;
    }
    @Override
    public void onCompleted() {
        values.add(nl.completed());
        size++;
    }
    @Override
    public void replay(ReplayProducer<T> child) {
        if (child.wip.getAndIncrement() != 0) {
            return;
        }
        
        int missed = 1;
        
        for (;;) {
            
            // implement
            
            missed = child.wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

We simply convert each event into a notification and add it to the list. Incrementing the volatile size fields acts as a release (no need for atomic increment because the callers of the onXXX methods are serialized), therefore, observing its value means the values list can be iterated up to that point (all resize related operations have been committed). The replay method, so far, is the well known queue-drain pattern: a single thread will enter and do whatever it can to emit values. Let's see the drain part in this method:


// for (;;)

long r = child.requested.get();
boolean unbounded = r == Long.MAX_VALUE;
long e = 0;
int index = child.index;                    // (1)

while (r != 0L && index != size) {          // (2)
    if (child.isUnsubscribed()) {
        return;
    }
    
    Object v = values.get(index);           // (3)
    
    if (nl.accept(child.child, v)) {        // (4)
        return;
    }
    
    index++;
    r--;
    e--;                                    // (5)
}

if (e != 0L) {
    child.index = index;                    // (6)
    if (!unbounded) {
        child.requested.addAndGet(e);
    }
}
// missed = ...

This should also look familiar, let's see the reasoning behind certain lines:


  1. We retrieve the current child requested amount and the current child index. We remember if the request amount was Long.MAX_VALUE and have a counter for emitted values.
  2. We have to try emitting if the child can receive it and we haven't reached the end of the available values.
  3. If both requests and values are available, we get the next event by index.
  4. The NotificationLite.accept will convert the notification object into the proper onXXX call on the child Subscriber and return true if said event is a terminal event.
  5. We increment the index, decrement the remaining requested amount and decrement the emission amount. This latter may look strange but it saves us a negation when we update the child requested amount in (6).
  6. Finally, if there was any emission, we save the new index and if the child request wasn't unbounded, subtract the emitted count from the child request amount.

Bounded replay buffer

Managing a bounded replay buffer is more involved. I'm going to show a size-bound version but you should be able derive your own custom bounding logic based on it. First, we need a Node type that will hold the actual value and the link to the next Node:


static final class Node {
    final Object value;
    final long id;
    volatile Node next;
    public Node(Object value, long id) {
        this.value = value;
        this.id = id;
    }
}

The node holds the actual value, a pointer to the next node and an id field. This field will help with the request coordination later on.

Now let's see the implementation of the BoundedReplayBuffer:


static final class BoundedReplayBuffer<T>
implements ReplayBuffer<T> {
    
    final NotificationLite<T> nl = 
            NotificationLite.instance();
    
    volatile Node head;                            // (1)
    Node tail;
    int size;                                      // (2)
    final int maxSize;
    long id;
    
    
    public BoundedReplayBuffer(int maxSize) {      // (3)
        this.maxSize = maxSize;
        tail = new Node(null, 0);
        head = tail;
    }
    
    void add(Object value) {                       // (4)
        Node n = new Node(value, ++id);
        Node t = tail;
        tail = n;
        t.next = n;
    }
    
    @Override
    public void onNext(T value) {
        add(nl.next(value));
        if (size == maxSize) {                     // (5)
            Node h = head;
            head = h.next;
        } else {
            size++;
        }
    }
    
    @Override
    public void onError(Throwable e) {             // (6)
        add(nl.error(e));
    }
    
    @Override
    public void onCompleted() {
        add(nl.completed());
    }
    
    @Override
    public void replay(ReplayProducer<T> child) {  // (7)
        if (child.wip.getAndIncrement() != 0) {
            return;
        }
        
        int missed = 1;

        for (;;) {
            
            // implement
            
            missed = child.wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

This kind of buffer has to consider more state:


  1. We have to keep a pointer to the head and tail of the linked node structure. The head has to be volatile because we are going to read it when a child Subscriber subscribes to it; I call this pinning. The tail is only modified from the thread of the main source (already serialized) and is never accessed by the child Subscribers so no need for volatile there.
  2. Since we want to limit the number of values to be replayed, we have to know the current size (without walking the linked list all the time) and the maximum allowed count. In addition, we will tag each node with an unique running identifier that will come into play during request coordination.
  3. In the constructor, we create our first empty node and assign it to both head and tail. This may seem a bit odd but has its reasons: it allows appending to the end of an empty buffer, otherwise an empty buffer would have null pointers and we'd get a discontinuity. There are two small drawbacks: a) this means the start value at any given time is head.next.value, behind an indirection and b) as we will move the head pointer ahead in (5), it will retain one extra value. In other terms, a replay(5) will keep 6 objects alive. This is true for RxJava's replay() and ReplaySubject too. If one really wants to avoid retaining this extra value, you have to apply reference counting to node which itself adds overhead for every value, both when added to the buffer and when replayed. 
  4. We will add new nodes of notifications to the linked list via add. The operation is straightforward: create a new node with a new unique identifier, make it the tail and set the next field of the old tail to this new node. The order is important here because next is volatile and acts as a release operation to all changes made before.
  5. Whenever a normal value arrives, we add it to the list and see if we are already at the capacity limit. If not, we can increment the size counter freely. Otherwise, there is no need to change the size anymore as the plus 1 from the add and minus 1 from the remove operation cancels out. This remove operation is basically moving the head forward by one node: given the current head, make the new head the next pointer of the old head. Since the linked structure is guaranteed to have at least one node (due to add()), the new head won't be null and the continuity is preserved.
  6. Since the terminal events are (usually) not part of the size bound, we can simply just add their node and not care about trimming the list.
  7. Again, the outer drain loop has the well known pattern.

Now let's see the inner parts of the drain loop of (7):


// for (;;) {
long r = child.requested.get();
boolean unbounded = r == Long.MAX_VALUE;
long e = 0;
Node index = child.node;

if (index == null) {                       // (1)
    index = head;
    child.node = index;
    
    child.addTotalRequested(index.id);     // (2)
}

while (r != 0L && index.next != null) {    // (3)
    if (child.isUnsubscribed()) {
        return;
    }
    
    Object v = index.value;
    
    if (nl.accept(child.child, v)) {
        return;
    }
    
    index = index.next;                    // (4)
    r--;
    e--;
}

if (e != 0L) {
    child.node = index;
    if (unbounded) {
        child.requested.addAndGet(e);
    }
}
// missed = ...

At this point, it shouldn't come to surprise the implementation uses the same pattern as with the UnboundedReplayBuffer, but there are a few differences:


  1. Since the nodes are object references, their default is null so the first time the replay is called, we have to capture (pin) the current head of the buffer (and store it in case the requested amount is still zero).
  2. The addTotalRequested will get this first node's unique identifier. The reason will be explained in the request coordination section below.
  3. To see if we reached the end of the available values, we have to check the next field of index.
  4. If the index.next was not null, we have a value for emission and can move the current index ahead by one node.

Request coordination

So far, there shouldn't be anything overly complicated with the classes (apart from a few unexplained methods and the structure of ReplayProducer).

As stated in the previous blog post, generally there are two ways to coordinate requests: lock-stepping and max-requesting. Lock-stepping was quite suitable for the PublishConnectableObservable.

Let's think about lock-stepping in terms of the replay operation we want to implement. If we want to do unbounded buffering, requesting the minimum amount of all child subscribers doesn't really matter as we will retain all values regardless when they are requested; everyone will gets its amount replayed regardless of the others: if there is some Subscriber that can take it all, why not get the values for it?

I we want to do bounded buffering, child Subscribers may come and go at different times, which means the current identifier inside BoundedReplaySubject is different for each one and each Subscriber will essentially request values relative to this identifier. Here, there is no clear definition of minimum request because the request of 5 in an earlier Subscriber and a request of 2 in later Subscriber that arrives after the 2nd source value can't be meaningfully compared.

Based on this reasoning, what we will do is implementing the request coordination to request the maximum amount that any child requests at any time and let the queue-drain deal with the emission.

However, we still have the problem of non-comparable request amounts due to potential time differences. This is where the unique identifier and another structure comes into play: keeping track the total requested (along with the relative requested). Whenever a child Subscriber requests, we will add this request amount to that particular Subscriber's totalRequested amount (ReplayPublisher.totalRequested) and see if this amount is bigger than the total requested amount we are sending to the main source. If bigger, we request only the difference from upstream.

The unique identifier helps with latecomers in our total-requested scheme. Without it, a latecomer's total requested amount would be too low and not trigger upstream requested in certain situations. For example, let's assume we have a child Subscriber on a range(1, 10).replay(1) that requests 2 elements and gets it. Then a new subscriber comes in and requests 2 as well. Clearly, it should receive 2 values (2, 3), but since its total requested amount is just 2, the replay operator won't request the extra value. The solution is the indexing of values and when the current Node is first captured, use the index as the total requested amount for the child as if the child was there from the beginning but ignored values up to that point.

Note: this property of was just recently discovered as as such, RxJava didn't work correctly. The PR #3454 fixes this for the 1.x series and I'll post a PR for 2.x later.

To make this more clear, let's see the implementation of the ReplayProducer.

static final class ReplayProducer<T> 
implements Producer, Subscription {
    int index;
    Node node;                                    // (1)
    final Subscriber<? super T> child;
    final AtomicLong requested;
    final AtomicInteger wip;
    final AtomicLong totalRequested;
    final AtomicBoolean once;                     // (2)

    Connection<T> connection;

    public ReplayProducer(
            Subscriber<? super T> child) {
        this.child = child;
        this.requested = new AtomicLong();
        this.totalRequested = new AtomicLong();
        this.wip = new AtomicInteger();
        this.once = new AtomicBoolean();
    }

    @Override
    public void request(long n) {
        if (n > 0) {
            BackpressureUtils
            .getAndAddRequest(requested, n);
            BackpressureUtils
            .getAndAddRequest(totalRequested, n); // (3)

            connection.manageRequests();          // (4)
        }
    }

    @Override
    public boolean isUnsubscribed() {
        return once.get();
    }

    @Override
    public void unsubscribe() {
        if (once.compareAndSet(false, true)) {
            connection.remove(this);             // (5)
        }
    }

    void addTotalRequested(long n) {             // (6)
        if (n > 0) {
            BackpressureUtils
            .getAndAddRequest(totalRequested, n);
        }
    }
}


Its purpose is to be set on a child Subscriber and mediate the request and unsubscription requests for it:

  1. We want to use the same class for both the bounded and unbounded buffer mode so we have to store the current index/node in fields.
  2. We have the usual set of fields: the child Subscriber, the wip counter for the queue-drain serialization, the current requested amount and an AtomicBoolean field indicating an unsubscribed state. In addition we will track the total requested amount and will coordinate requesting from upstream with the help of it.
  3. Whenever the child requests, we update both the relative requested amount and the total requested amount with the common BackpressureUtils helper that will cap the amounts at Long.MAX_VALUE if necessary.
  4. Once set, we have to trigger a request management to determine if the upstream needs to be requested or not.
  5. When the child unsubscribes, we need to remove this ReplayProducer from the array of tracked ReplayProducers.
  6. Finally, the bounded buffer's replay requires to update the total requested amount before emission so the request coordination works with latecomers as well.

Before looking at the manageRequests() call, I have to show the skeleton of the Connection class (the equivalent class from PublishConnectableObservable):


@SuppressWarnings({ "unchecked", "rawtypes" })
static final class Connection<T> implements Observer<T> {

    final AtomicReference<ReplayProducer<T>[]> subscribers;
    final State<T> state;
    final AtomicBoolean connected;
    final AtomicInteger wip;

    final SourceSubscriber parent;

    final ReplayBuffer<T> buffer;                        // (1)

    static final ReplayProducer[] EMPTY = 
        new ReplayProducer[0];

    static final ReplayProducer[] TERMINATED = 
        new ReplayProducer[0];
    
    long maxChildRequested;                              // (2)
    long maxUpstreamRequested;

    public Connection(State<T> state, int maxSize) {
        this.state = state;
        this.wip = new AtomicInteger();
        this.subscribers = new AtomicReference<>(EMPTY);
        this.connected = new AtomicBoolean();
        this.parent = createParent();
        
        ReplayBuffer b;                                 // (3)
        if (maxSize == Integer.MAX_VALUE) {
            b = new UnboundedReplayBuffer<>();
        } else {
            b = new BoundedReplayBuffer<>(maxSize);
        }
        this.buffer = b;
    }

    SourceSubscriber createParent() {                   // (4)
        SourceSubscriber parent = 
            new SourceSubscriber<>(this);

        parent.add(Subscriptions.create(() -> {
            switch (state.strategy) {
            case SEND_COMPLETED:
                onCompleted();
                break;
            case SEND_ERROR:
                onError(new CancellationException(
                    "Disconnected"));
                break;
            default:
                parent.unsubscribe();
                subscribers.getAndSet(TERMINATED);
            }
        }));

        return parent; 
    }

    boolean add(ReplayProducer<T> producer) {
        // omitted
    }

    void remove(ReplayProducer<T> producer) {
        // omitted 
    }

    void onConnect(
    Action1<? super Subscription> disconnect) {
        // omitted
    }

    @Override
    public void onNext(T t) {                          // (5)
        ReplayBuffer<T> buffer = this.buffer;
        buffer.onNext(t);
        ReplayProducer<T>[] a = subscribers
            .get();
        for (ReplayProducer<T> rp : a) {
            buffer.replay(rp);
        }
    }

    @Override
    public void onError(Throwable e) {
        ReplayBuffer<T> buffer = this.buffer;
        buffer.onError(e);
        ReplayProducer<T>[] a = subscribers
            .getAndSet(TERMINATED);
        for (ReplayProducer<T> rp : a) {
            buffer.replay(rp);
        }
    }

    @Override
    public void onCompleted() {
        ReplayBuffer<T> buffer = this.buffer;
        buffer.onCompleted();
        ReplayProducer<T>[] a = subscribers
            .getAndSet(TERMINATED);
        for (ReplayProducer<T> rp : a) {
            buffer.replay(rp);
        }
    }

    void manageRequests() {                           // (6)
        if (wip.getAndIncrement() != 0) {
            return;
        }
        
        int missed = 1;
        
        for (;;) {

            // implement            
            
            missed = wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

The class looks quite the same as PublishConnectableObservable.Connect, therefore, I've omitted the methods that are exactly the same. Let's see the rest:


  1. Instead of a bounded queue, we now have the common ReplayBuffer interface.
  2. We have to keep track the maximum values of both child requests and requests issued to upstream. The latter is necessary because we can't know when the upstream's Producer arrives and we have to accumulate the coordinated request amount until it arrives.
  3. I treat Integer.MAX_VALUE as the indicator for the unbounded replay mode.
  4. The createParent is slightly changed. Instead of the disconnected flag, we now unsubscribe directly from upstream. The implementations add, remove and onConnect are the same as in the last post.
  5. The onXXX methods have the same pattern: call the appropriate method on the buffer instance and then call replay for all known ReplayProducer instances. Note that the terminal events also swap in the TERMINATED array atomically, indicating that subsequent Subscribers have to go to the next Connection object.
  6. Last but not least, we have to manage requests from all child Subscribers which may call the method concurrently and thus we have to do some serialization. Since we are going to calculate the maximum to request, the non-blocking serialization approach works here quite well. This method is called when the upstream producer finally arrives and when any child subscriber requests something.

Now let's dive into the request coordination logic.


// for (;;) {

ReplayProducer<T>[] a = subscribers.get();

if (a == TERMINATED) {
    return;
}

long ri = maxChildRequested;
long maxTotalRequests = ri;                 // (1)

for (ReplayProducer<T> rp : a) {
    maxTotalRequests = Math.max(
        maxTotalRequests, 
        rp.totalRequested.get());
}

long ur = maxUpstreamRequested;
Producer p = parent.producer;

long diff = maxTotalRequests - ri;          // (2)
if (diff != 0) {
    maxChildRequested = maxTotalRequests;
    if (p != null) {                        // (3)
        if (ur != 0L) {
            maxUpstreamRequested = 0L;
            p.request(ur + diff);           // (4)
        } else {
            p.request(diff);
        }
    } else {
        long u = ur + diff;
        if (u < 0) {
            u = Long.MAX_VALUE;
        }
        maxUpstreamRequested = u;           // (5)
    }
} else
if (ur != 0L && p != null) {                // (6)
    maxUpstreamRequested = 0L;
    p.request(ur);
}

// missed = ...

Let's see how it works:


  1. After retrieving the current array of Subscribers and checking for the disconnected/terminated state, we compute the maximum of the total requested amount of each subscriber (and the previously known maximum).
  2. We calculate the difference from the last known maximum. If the difference is non zero, we remember the new maximum in maxChildRequested.
  3. At this point, the upstream Producer may be still missing. 
  4. If the producer is already there, we take any missed amount and the current difference and request it.
  5. Otherwise, without a producer, all we can do is to accumulate all the missed differences.
  6. If the maximum didn't change, we still might have to request all missed amounts if the Producer is there. As with (4), we have to "forget" all the missed values thus the next time the requests have to be coordinated, the upstream will only receive the non-zero difference then on.

In other terms, we collect how far each child subscriber wants to go and request from the upstream based on it.

As you may have noticed, this request coordination and the call to it can become quite expensive if there are lots of child Subscribers requesting left and right. In fact, we'd only have to deal with a limited set of requesters at a time and not with everyone. To solve the performance impact, we have to introduce a well known pattern: an emitter-loop or queue-drain that plays with the same serialization logic but the method now receives a parameter indicating who wants to update the coordinated request amount. This way, when a child requests some value and not others, only one child is evaluated instead of all.

There is, however, one thing to prepare for: the arrival of the upstream Producer in which case we still have to check all children. For this, we need to extend the Connection object with some extra fields:

List<ReplayProducer<T>> coordinationQueue;
boolean coordinateAll;
boolean emitting;
boolean missed;

You might have guessed what approach this will take: emitter loop. We can drop the wip counter and replace it with emitting/missed.


void manageRequests(ReplayProducer<T> inner) {
    synchronized (this) {                               // (1)
        if (emitting) {
            if (inner != null) {
                List<ReplayProducer<T>> q = 
                    coordinationQueue;
                if (q == null) {
                    q = new ArrayList<>();
                    coordinationQueue = q;
                }
                q.add(inner);
            } else {
                coordinateAll = true;
            }
            missed = true;
            return;
        }
        emitting = true;
    }
    
    long ri = maxChildRequested;
    long maxTotalRequested;
    
    if (inner != null) {                                // (2)
        maxTotalRequested = Math.max(
            ri, inner.totalRequested.get());
    } else {
        maxTotalRequested = ri;

        @SuppressWarnings("unchecked")
        ReplayProducer<T>[] a = producers.get();
        for (ReplayProducer<T> rp : a) {
            maxTotalRequested = Math.max(
                maxTotalRequested, rp.totalRequested.get());
        }
        
    }
    makeRequest(maxTotalRequested, ri);
    
    for (;;) {
        if (isUnsubscribed()) {
            return;
        }
        
        List<ReplayProducer<T>> q;
        boolean all;
        synchronized (this) {                           // (3)
            if (!missed) {
                emitting = false;
                return;
            }
            missed = false;
            q = coordinationQueue;
            coordinationQueue = null;
            all = coordinateAll;
            coordinateAll = false;
        }
        
        ri = maxChildRequested;                         // (4)
        maxTotalRequested = ri;

        if (q != null) {
            for (ReplayProducer<T> rp : q) {
                maxTotalRequested = Math.max(
                maxTotalRequested, rp.totalRequested.get());
            }
        } 
        
        if (all) {
            @SuppressWarnings("unchecked")
            ReplayProducer<T>[] a = producers.get();
            for (ReplayProducer<T> rp : a) {
                maxTotalRequested = Math.max(
                maxTotalRequested, rp.totalRequested.get());
            }
        }
        
        makeRequest(maxTotalRequested, ri);
    }
}

It works as follows:


  1. First, we try to enter the emission loop. If it fails and the parameter to the method was null, we set the coordinateAll flag which will trigger a full sweep. Otherwise, we queue up the ReplayProducer and quit.
  2. If the current thread managed to get into the emission state, we either determine the maximum requested by using the single ReplayProducer the method was called with or do a full sweep if it was actually null.
  3. Next comes the loop part of the emitter-loop approach. We check if we missed some calls and get all the queued up ReplayProducers as well as the indicator for a full sweep.
  4. Given all previous inputs we sweep the queued up ReplayProducers for the maximum value and if necessary, all the other known ReplayProducers as well. Note that they both may have to run since the queue may have ReplayProducers not known at the time this method runs and vice versa.
Finally, the upstream requesting can be factored out into a common method:

void makeRequest(long maxTotalRequests, long previousTotalRequests) {
    long ur = maxUpstreamRequested;
    Producer p = producer;

    long diff = maxTotalRequests - previousTotalRequests;
    if (diff != 0) {
        maxChildRequested = maxTotalRequests;
        if (p != null) {
            if (ur != 0L) {
                maxUpstreamRequested = 0L;
                p.request(ur + diff);
            } else {
                p.request(diff);
            }
        } else {
            long u = ur + diff;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            maxUpstreamRequested = u;
        }
    } else
    if (ur != 0L && p != null) {
        maxUpstreamRequested = 0L;
        // fire the accumulated requests
        p.request(ur);
    }
}

which is practically the same as with the original sweep-all manageRequests() method is.

ReplayConnectableObservable

All what's remaining in this post to show the remaining SourceSubscriber class, the ReplayConnectableObservable itself.

Since we need the producer from upstream, we use the SourceSubscriber to store it for us and get it once ready. Note that we can't use Subscriber.request() here for two reasons: a) the call to request() don't accumulate until a Producer arrives and b) we can't know if there has a Producer arrived or not.


static final class SourceSubscriber<T> 
extends Subscriber<T> {
    final Connection<T> connection;
    
    volatile Producer producer;
    
    public SourceSubscriber(Connection<T> connection) {
        this.connection = connection;
    }

    @Override
    public void onNext(T t) {
        connection.onNext(t);
    }

    @Override
    public void onError(Throwable e) {
        connection.onError(e);
    }

    @Override
    public void onCompleted() {
        connection.onCompleted();
    }

    @Override
    public void setProducer(Producer p) {
        producer = p;
        connection.manageRequests();
    }
}

Nothing outstanding: we delegate everything to the Connection instance. Note the connection.manageRquests() call which will trigger the request coordination to actually request the amount held in the maxUpstreamRequested field (i.e., the missed requests). If we have the more performant version, the call is manageRequests(null) instead.

The State class is also changed a bit due to the indication of bounded buffering and due to the need to start replaying to a new Subscriber once it successfully subscribed to the current connection.


static final class State<T> implements OnSubscribe<T> {
    final DisconnectStrategy strategy;
    final Observable<T> source;
    final int maxSize;                                     // (1)

    final AtomicReference<Connection<T>> connection;

    public State(DisconnectStrategy strategy, 
    Observable<T> source, int maxSize) {
        this.strategy = strategy;
        this.source = source;
        this.maxSize = maxSize;
        this.connection = new AtomicReference<>(
            new Connection<>(this, maxSize));
    }

    @Override
    public void call(Subscriber<? super T> s) {
        // implement
        ReplayProducer<T> pp = new ReplayProducer<>(s);

        for (;;) {
            Connection<T> curr = this.connection.get();

            pp.connection = curr;

            if (curr.add(pp)) {
                if (pp.isUnsubscribed()) {
                    curr.remove(pp);
                } else {
                    curr.buffer.replay(pp);               // (2)

                    s.add(pp);
                    s.setProducer(pp);
                }
                
                break;
            }
        }
    }

    public void connect(
    Action1<? super Subscription> disconnect) {
        // same as before
    }

    public void replaceConnection(Connection<T> conn) {   // (3)
        Connection<T> next = 
            new Connection<>(this, maxSize);
        connection.compareAndSet(conn, next);
    }
}

There are some changes:

  1. We have to store the maxSize parameter because a reconnection has to recreate the appropriate ReplayBuffer instance as well.
  2. Once we create an ReplayProducer, first we try to add it to the current connection. If successful, then we do a barebone replay call. Since the ReplayProducer has requested value of zero, this won't replay any value to the child Subscriber. What it does is that it captures the current head of the buffer's linked list (if the buffer is bounded), pins it and makes sure this ReplayProducer starts with the correct total requested amount. Only after this setup is the ReplayProducer added to the child as an unsubscription and request target.
  3. Note that the Connection now requires a maxSize parameter.

Note that this order in (2) does work only because I've shown an implementation of the replay that replays terminal events only when requested which is not a necessary requirement or expectation for terminal events, although should not cause any real world problems as most Subscribers just keep requesting.

Finally, we still need factory methods to create instances of ReplayConnectableObservable:


public static <T> ReplayConnectableObservable<T> createUnbounded(
        Observable<T> source, 
        DisconnectStrategy strategy) {
    return createBounded(source, strategy, Integer.MAX_VALUE);
}

public static <T> ReplayConnectableObservable<T> createBounded(
        Observable<T> source, 
        DisconnectStrategy strategy, int maxSize) {
    State<T> state = new State<>(strategy, source, maxSize);
    return new ReplayConnectableObservable<>(state);
}


Conclusion

In this blog post, I've detailed the inner workings of a replay-like ConnectableObservable that can do both bounded and unbounded replays. The complexity is one level up relative to the PublishConnectableObservable from the last part; if you understood that then this shouldn't come as a too large leap. The added complexity comes from the management of the buffer and the coordination of requests with the max strategy.

In the next part, I'm going to talk a bit about how to turn such ConnectableObservables into Subjects that now will perform request coordination which may become mandatory for RxJava 2.0 Subjects and Reactive-Streams Processors, depending on how a certain discussion will be resolved.

2015. október 21., szerda

ConnectableObservables (part 2)

Introduction


In the previous post, I've shown how one can write a "simple" ConnectableObservable that uses a Subject to dispatch events to subscribers once it has been connected.

The shortcoming of the solution is that there is no request coordination and everything runs in unbounded mode: the developers have to apply onBackpressureXXX strategies per subscriber, however, that leads to either dropping data or buffer bloat.

If the underlying Observable is cold, there should be a way to make sure it emits only as much elements as the child subscribers can process. To achieve this, we need request coordination.


Request coordination

So far, the operators we were implementing had to deal with a single child subscriber and its request at a time. One had to either pass it through, rebatch it or accumulate it, based on the business logic of said operator.

When there are multiple child Subscribers, the problem space suddenly receives a new dimension. What are the new problems?

Every bit counts

First, different child subscribers may request different amounts. Some may request small amounts, some may request larger amounts and others may want to run in unbounded mode (i.e., request(Long.MAX_VALUE)). In addition, the request calls may happen any time and with any amount.

Given such heterogeneous request pattern, what should be the request amount sent to the upstream Observable source?

There are two main options:

  1. request as much that the smallest child Subscriber requested and
  2. request as much as the largest child Subscriber requested.
Option 1) is essentially the lockstep approach. Its benefit is that there is no no need for request re-batching and buffering since once the upstream emits, everybody can receive it immediately. (Rebatching and buffering is an option in case the request amounts are really 1s or 10s at a time.) The drawback is that the whole setup slows down to the slowest child Subscriber, which if "forgets" to request, nobody gets anything.

Option 2) gives more room to individual child Subscribers and allows them to run on their own pace. However, this solution requires unbounded buffering capability (which may be shared or per each Subscriber). This means if there is an unbounded child Subscriber, the operator has to request Long.MAX_VALUE and fill the buffers for everyone. This, depending on the operator, may be of no problem though.

Subscribers may come and go at will

The second problem is that the the number of Subscribers may not be constant: new subscribers arrive, old ones leave. This poses another set of problems:

  1. A child Subscriber may request Long.MAX_VALUE then leave after a few (or no) elements.
  2. A child Subscriber may arrive but not request anything, stopping everyone else.
  3. A child Subscriber may leave at any time and thus its request amount "pressure" has to be released.
  4. All child Subscribers leave before the upstream Observable completes. What should happen in this case?
Unfortunately, problems 1) and 2) require mutually exclusive approaches explained above (lockstep vs. unbounded buffering). Problem 3) requires unsubscription action.

Problem 4) depends on the approach taken in respect to 1) and 2).

Within the lockstep approach, two sub-options arise. Either one has to introduce some bounded buffers that will hold onto the requested amounts, which now has to be re-batched to fit in, and simply await the new Subscribers. Otherwise, one has to slowly "drip" away the source values until a child Subscriber arrives. 

Within the unbounded buffering approach, one can simply keep buffering or again, start dropping values.

Approaches taken in RxJava

RxJava has two operators that return a ConnectableObservable: publish() and replay(). For a long time, these were ignoring backpressure completely and behaved just like the MulticastSupplier in the previous part.

These operators were rewritten to support backpressure (in 1.0.13 and 1.0.14 respectively) and had to take the problems mentioned before into account. The solutions were as follows

Operator publish() does lockstepping with a fixed prefetch buffer: the buffer is only drained (and then replenished) if all known child Subscribers can take a value. If there are no child Subscribers, it "slowly drips away" it source, which means it starts to request 1 by 1 and drops these values.

Operator replay() does unbounded buffering. The reason for this is that both the bounded and unbounded version of replay() has to buffer and replay all values from the upstream anyway. You may think, why buffer everything when the replay is time and/or size bound. The answer is that these operators, similar to Subjects, have to deliver events continuously and without skips; if there is an child Subscriber that arrived at some time, requested 1 then went to "sleep", the next time it requests the bounded replay has to present the next value, no matter how far ahead the other Subscribers went in the meantime.


The effect of disconnection

There is a problem that isn't dealt with in the RxJava operators but has to be mentioned. If one unsubscribes the Subscription returned by the connect() method, the upstream will stop sending further events.

The problem is that this may leave the child Subscribers hanging: they won't receive any further events (beyond those that are already in some buffer of the respective operator). We have similar problems with CompletableFutures in Java 8. One can cancel a Future but what happens to those that were awaiting its result?

The solution in Java 8 is to emit a CancellationException as the result in this case so that the dependent computations can terminate. However, this isn't the case with RxJava (in both 1.x and 2.x branches). The current implementation will just hang the child Subscribers.

This problem may appear outside of a ConnectableObservable as well. For some time, the RxAndroid 0.x library contained an operator that were applied to all sequences and unsubscribed them if the lifecycle required cleanup. The problem was that this left child Subscribers without termination events. I suggested emitting an onError and onCompleted event for this case. There was no resolution of the problem and the operator was removed before 1.0.

On a personal note, I don't remember anyone from the community complaining about this problem and it seems nobody is really affected by this behavior. As with many obscure and corner cases, if I don't mention them, nobody else seems to discover them.

The effect of termination

Upstream Observables may terminate normally, in which case the ConnectableObservable will emit the terminal event to child Subscribers.

At this point, a new Subscriber may subscribe to the terminated ConnectableObservable. What should happen in this case? Does the termination also mean disconnection? Should the child Subscriber get terminated instantly, similar to PublishSubject?

Again the solution requires business decision. RxJava chose the approach that a terminal event sent to a ConnectableObservables is considered a disconnect event and late coming Subscribers won't receive any terminal event but will be remembered until another call to connect() happens.

This has the benefit that the developers can "prepare" child Subscribers before the upstream Observable gets run and thus avoid losing events. The drawback is that one has to remember to call connect() again, otherwise nothing runs and the Subscribers are left hanging.

Family of collectors and emitters

Before we jump into some code, I'd like to sketch out a pattern that is the foundation of almost all operators that deal with either multiple sources or multiple child Subscribers.

I've written dozens of such operators and I've noticed they all use the same set of components and methods:

  1. They all need to track Subscribers, either the child Subscribers or the Subscribers that are subscribed to the source Observables. The tracking structure uses the copy-on-write approach of array-based resource containers.
  2. They all use an emitter loop (synchronized) or drain loop (atomics) which has to be triggered from many places: when an event is emitted from upstream(s), when a new child Subscriber arrives, when a request comes from child Subscribers and sometimes when a child unsubscribes.
  3. The loop has some preprocessing step: figuring out where the Subscribers are at the moment, selecting which source to drain or combining available values from sources in some fashion
  4. Finally, the events are delivered to Subscriber(s) and replenishments are requested from source Observable(s).

Which operator?

Now that we are aware of the problems, let's implement a ConnectableObservable which does request coordination.

I've been thinking what operator to implement. My first thought was to show how to implement the operator pair of an AsyncSubject or BehaviorSubject (similar to how publish() is the pair of PublishSubject), however, the former can be implemented using plain composition plus replay():

public ConnectableObservable<T> async() {
    return takeLast(1).replay();
}

Implementing the pair of BehaviorSubject is a bit more involved. The naive implementation would use composition such as this:

public ConnectableObservable<T> behave() {
    return replay(1);
}

However, this doesn't properly capture the behavior of a terminated BehaviorSubject: child Subscribers get nothing but a terminal event whereas replay will always replay 1 value and 1 terminal event after it completed.

To minimize brain melting, I'm not going to show how to implement a variant of the least complex of the operators: publish().


Publish (or die)

First, let's sketch out all the requirements we want to achieve:


  1. The operator should do a lockstep-based request coordination with prefetching (for efficiency)
  2. The effect of disconnection on the child Subscribers should be parametrizable: no event, signal error or signal completion.
  3. The operator should be considered terminated and new subscribers will wait for the next connect().
  4. The operator will allow errors to cut ahead. (Implementing error-delay is an excercise left to the reader).
  5. The operator will use a power-of-2 prefetch buffer.


With these requirements, we start with the skeleton of the class as usual:


public class PublishConnectableObservable<T> 
extends ConnectableObservable<T> {

    public enum DisconnectStrategy {                           // (1)
        NO_EVENT,
        SEND_ERROR,
        SEND_COMPLETED
    }
    
    public static <T> PublishConnectableObservable<T> 
    createWith(                                               // (2)
            Observable<T> source, 
            DisconnectStrategy strategy) {
        State<T> state = new State<>(strategy, source);
        return new PublishConnectableObservable<>(state);
    }
    
    final State<T> state;
    
    protected PublishConnectableObservable(State<T> state) {  // (3)
        super(state);
        this.state = state;
    }
    
    @Override
    public void connect(
            Action1<? super Subscription> connection) {       // (4)
        state.connect(connection);
    }
}

Nothing extraordinary so far:

  1. We create an enum for the disconnection strategy
  2. We have to use a factory method because the internal state has to be accessible from OnSubscribe and from instance methods of this class.
  3. We construct the object where State doubles as an OnSubscribe to save on allocation.
  4. Finally, we delegate the connection attempt to the state object. This gives us a less verbose source code.
Next comes the state object with some familiar structure (see last post of this series):

static final class State<T> implements OnSubscribe<T> {
    final DisconnectStrategy strategy;
    final Observable<T> source;
    
    final AtomicReference<Connection<T>> connection;      // (1)
      
    public State(DisconnectStrategy strategy, 
            Observable<T> source) {                       // (2)
        this.strategy = strategy;
        this.source = source;
        this.connection = new AtomicReference<>(
            new Connection<>(this)
        );
    }
        
    @Override
    public void call(Subscriber<? super T> s) {           // (3)
        // implement
    }
        
    public void connect(
        Action1<? super Subscription> disconnect) {       // (4)
        // implement
    }
        
    public void replaceConnection(Connection<T> conn) {   // (5)
        Connection<T> next = new Connection<>(this);
        connection.compareAndSet(conn, next);
    }
}

The state object will handle the connection, subscription and reconnection cases:

  1. Because we have to reconnect, we store the current connection in an AtomicReference.
  2. We initialize the source and strategy fields and set up an initial unconnected connection.
  3. The method call() from OnSubscribe will handle the subscribers; I'll show the implementation further down.
  4. The connect method will handle the connection attempts; I'll show the implementation further down.
  5. Finally, once a connection has been terminated on its own or via unsubscribe, we have to replace the old connection with a fresh connection atomically and not overwriting somebody else's fresh connection due to races.
Before going deep into the complicated logic, two more simplistic classes remain. The first is the Subscriber that will be subscribed to the source Observable:


static final class SourceSubscriber<T> 
extends Subscriber<T> {
    final Connection<T> connection;
    public SourceSubscriber(
            Connection<T> connection) {    // (1)
        this.connection = connection;
    }
    @Override
    public void onStart() {
        request(RxRingBuffer.SIZE);        // (2)
    }

    @Override
    public void onNext(T t) {
        connection.onNext(t);              // (3)
    }

    @Override
    public void onError(Throwable e) {
        connection.onError(e);
    }

    @Override
    public void onCompleted() {
        connection.onCompleted();
    }
    
    public void requestMore(long n) {      // (4)
        request(n);
    }
}

The class, again is full of delegations:

  1. We store the connection object and we will delegate events to it.
  2. If this Subscriber is subscribed to the source Observable, we request only a limited number of elements upfront. (Parametrizing this is left to the reader).
  3. Again, for class simplicity, we delegate the events to the connection object, which happens to implement the Observer interface for convenience
  4. We will have to replenish all consumed values but request() is a protected method: it is exposed through the requestMore() method.
Next comes a Publisher and Subscriber instance that will handle the unsubscription and request accounting for the child Subscribers of our operator.

static final class PublishProducer<T> 
implements Producer, Subscription {
    final Subscriber<? super T> actual;
    final AtomicLong requested;
    final AtomicBoolean once;
    volatile Connection<T> connection;             // (1)
    
    public PublishProducer(
            Subscriber<? super T> actual) {
        this.actual = actual;
        this.requested = new AtomicLong();
        this.once = new AtomicBoolean();
    }
    
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n > 0) {
            BackpressureUtils
                .getAndAddRequest(requested, n);
            Connection<T> conn = connection;       // (2)
            if (conn != null) {
                conn.drain();
            }
        }
    }
    
    @Override
    public boolean isUnsubscribed() {
        return once.get();
    }
    
    @Override
    public void unsubscribe() {
        if (once.compareAndSet(false, true)) {
            Connection<T> conn = connection;       // (3)
            if (conn != null) {
                conn.remove(this);
                conn.drain();
            }
        }
    }
}

This is a bit more interesting.

  1. We need to know about what connection this class has to deal with for two reasons: 1) it has to notify the connection the underlying Subscriber can receive values, 2) if the subscriber unsubscribes, it may mean the other Subscribers can now receive further values.
  2. Since request() runs asynchronously, the connection might not be available yet. We have to remember to call drain() once this connection becomes available (shown later on).
  3. Since unsubscribe() runs asynchronously as well, it has check for non-null and only remove itself from the array of subscribers (shown later on). Note also the idempotence provided by once.

The final class, in skeleton form is the Connection itself:


@SuppressWarnings({ "unchecked", "rawtypes" })
static final class Connection<T>
 implements Observer<T> {                             // (1)

    final AtomicReference<PublishProducer<T>[]>
        subscribers;
    final State<T> state;
    final AtomicBoolean connected;
    
    final Queue<T> queue;
    final AtomicReference<Throwable> error;
    volatile boolean done;

    volatile boolean disconnected;
    
    final AtomicInteger wip;
    
    final SourceSubscriber parent;
    
    
    static final PublishProducer[] EMPTY = 
        new PublishProducer[0];

    static final PublishProducer[] TERMINATED = 
        new PublishProducer[0];
    
    public Connection(State<T> state) {               // (2)
        this.state = state;
        this.subscribers = new AtomicReference<>(EMPTY);
        this.connected = new AtomicBoolean();
        this.queue = new SpscArrayQueue(
            RxRingBuffer.SIZE);
        this.error = new AtomicReference<>();
        this.wip = new AtomicInteger();
        this.parent = createParent();
    }
    
    SourceSubscriber createParent() {                 // (3)
        // implement
    }
    
    boolean add(PublishProducer<T> producer) {        // (4)
        // implement
    }
    
    void remove(PublishProducer<T> producer) {
        // implement
    }
    
    void onConnect(
         Action1<? super Subscription> disconnect) {  // (5)
        // implement
    }
    
    @Override
    public void onNext(T t) {                         // (6)
        // implement
    }

    @Override
    public void onError(Throwable e) {
        // implement
    }

    @Override
    public void onCompleted() {
        // implement
    }
    
    void drain() {                                    // (7)
        // implement
    }
    
    boolean checkTerminated(boolean d, 
        boolean empty) {
        // implement
    }
}

The method names and fields should look familiar by now:


  1. The class has to manage a set of state variables: the current array of Subscribers, the value queue plus the terminal event holders, the connection and disconnection indicators, the work counter for the queue-drain approach, the Subscriber that is subscribed to the Observable and finally the EMPTY and TERMINATED array indicators.
  2. The constructor initializes the various fields.
  3. The subscriber needs some preparations besides creating a new SourceSubscriber, therefore, I factored it out into a separate method.
  4. The copy-on-write handling of the known subscribers is done via add and remove, similar to how we did this with Subjects and with the array-backed Subscription container.
  5. We will handle the source events with these onXXX methods.
  6. Finally, the drain and termination check methods for the queue-drain approach.


The meltdown

So far, the classes and those methods implemented were nothing special. However, the real complexity starts from here on. I'll show the missing implementations one by one and mention the concurrency considerations with them as well..

I suggest you take a small break, drink some power-up, clear your head at this point.

Done? All right, let'd do this.

State.call

This method is responsible for handling the incoming child Subscribers. The method has to consider that the connection may terminate on its own or get disconnected concurrently:


@Override
public void call(Subscriber<? super T> s) {
    PublishProducer<T> pp 
        = new PublishProducer<>(s);
    
    s.add(pp);
    s.setProducer(pp);                                // (1)

    for (;;) {
        Connection<T> curr = connection.get();
        
        pp.connection = curr;                         // (2)
        if (curr.add(pp)) {                           // (3)
            if (pp.isUnsubscribed()) {                // (4)
                curr.remove(pp);
            } else {
                curr.drain();                         // (5)
            }
            break;
        }
    }
}


  1. First, we create a PublishProducer and set it on the subscriber to react to requests and unsubscription.
  2. Next, we retrieve the current known connection and set it on the PublishProducer so it can call the drain() method if it wishes.
  3. We attempt to add the PublishProducer to the internal tracking array. If this fails, it means the current connection has terminated and we have to try the next connection (once becomes available) by looping a bit.
  4. Even if the add succeeded, the child might have just unsubscribed and thus the remove might not have found it. By calling it here again, we can make it sure the PublishProducer doesn't stay in the array unnecessarily.
  5. Once the add succeeded, we have to call drain since a concurrent call in PublishProducer might have not seen a non-null connection and couldn't notify the connection for more values (or about unsubscription). The call will make sure this PublishProducer is handled as necessary.


State.connect

This method is responsible for triggering a single connection on an unconnected Connection instance and/or return the Subscription that let's an active Connection get unsubscribed.


public void connect(Action1<? super Subscription> disconnect) {
    for (;;) {
        Connection<T> curr = this.connection.get();
        
        if (!curr.connected.get() && 
                curr.connected.compareAndSet(false, true)) {  // (1)
            curr.doConnect(disconnect);
            return;
        }
        if (!curr.parent.isUnsubscribed()) {                  // (2)
            disconnect.call(curr.parent);
            return;
        }
        
        replaceConnection(curr);                              // (3)
    }
}

This method is also racing with a termination/disconnection and as such, it has to take them into account when attempting to establish a fresh connection.


  1. It works by first retrieving the current connection and if the current thread is the first, switch it into a connected state. If successful, the doConnect method is called which will do the necessary subscription work.
  2. Otherwise, check if the current connection is unsubscribed. If not return it to the callback. Note that there is a small window here where the current connection is determined active but may become disconnected/terminated when the method is called. Resolving this issue requires either blocking synchronization between termination and connection or other serialization approach. In practice, however, this is rarely an issue and can be ignored.
  3. Finally, if the current connection is disconnected, let's replace it with a fresh, not-yet connected Connection and try the loop again.

Connection.createParent

The method constructs a SourceSubscriber and sets it up to behave according to the disconnection strategy:

SourceSubscriber createParent() {
    SourceSubscriber parent = new SourceSubscriber<>(this);
    
    parent.add(Subscriptions.create(() -> {
        switch (state.strategy) {
        case SEND_COMPLETED:
            onCompleted();
            break;
        case SEND_ERROR:
            onError(new CancellationException("Disconnected"));
            break;
        default:
            disconnected = true;
            drain();
        }
    }));
    
    return parent;
}

The method will instantiate a SourceSubscriber and add a Subscription to it. This subscription, depending on the disconnection strategy, will either call onCompleted, onError with a CancellationException or set the disconnect flag followed by a call to drain (the onXXX methods call drain()).

We need the disconnected flag because we can't use an isUnsubscribed check: it would always skip the terminal event and appear as if we'd have the NO_EVENT strategy.


Connection.add, Connection.remove

The algorithms for adding and removing resources to an array-based container with copy-on-write semantics should be quite familiar by now. For completeness, here are the methods anyway:


boolean add(PublishProducer<T> producer) {
    for (;;) {
        PublishProducer<T>[] curr = subscribers.get();
        if (curr == TERMINATED) {
            return false;
        }
        
        int n = curr.length;
        
        PublishProducer<T>[] next = new PublishProducer[n + 1];
        System.arraycopy(curr, 0, next, 0, n);
        next[n] = producer;
        if (subscribers.compareAndSet(curr, next)) {
            return true;
        }
    }
}

void remove(PublishProducer<T> producer) {
    for (;;) {
        PublishProducer<T>[] curr = subscribers.get();
        if (curr == TERMINATED || curr == EMPTY) {
            return;
        }
        
        int n = curr.length;
        
        int j = -1;
        for (int i = 0; i < n; i++) {
            if (curr[i] == producer) {
                j = i;
                break;
            }
        }
        
        if (j < 0) {
            break;
        }
        PublishProducer<T>[] next;
        if (n == 1) {
            next = EMPTY;
        } else {
            next = new PublishProducer[n - 1];
            System.arraycopy(curr, 0, next, 0, j);
            System.arraycopy(curr, j + 1, next, j, n - j - 1);
        }
        if (subscribers.compareAndSet(curr, next)) {
            return;
        }
    }
}

Connection.onXXX

The four onXXX methods on the class are quite sort, therefore, I'll show them togheter in this subsection:


void onConnect(
         Action1<? super Subscription> disconnect) {        // (1)
    disconnect.call(this.parent);
      
    state.source.unsafeSubscribe(parent);
}
    
@Override
public void onNext(T t) {                                   // (2)
    if (queue.offer(t)) {
        drain();
    } else {
        onError(new MissingBackpressureException());
        parent.unsubscribe();
    }
}

@Override
public void onError(Throwable e) {
    if (!error.compareAndSet(null, e)) {                    // (3)
        e.printStackTrace();
    } else {
        done = true;
        drain();
    }
}

@Override
public void onCompleted() {                                 // (4)
    done = true;
    drain();
}

Let's see them:


  1. The reason we have to drag the Action1 all the way here instead of calling it State.connect at (2) is that the call must happen before the actual subscription to the underlying Observable to allow synchronous cancellation.
  2. The next method offers the value and calls drain to make sure it is delivered if possible. Note that if the queue is full, we reward it with a MissingBackpressureException and unsubscription; it means the upstream doesn't handle backpressure well or at all.
  3. Since we may receive an error as part of the upstream event as well as a disconnection event, we heed an AtomicReference and set only one of them as the terminal event. In this example, the first one wins, the other gets printed to the console. If the CAS succeded, we set the done flag and call drain to handle things.
  4. It is true onCompleted can also be called from two places, but since it just sets the done flag to true, there is no need for any CAS-ing here. It is also true that due to the disconnection strategy, the onError and onCompleted can race with each other. However, since the difference of handling them is just that error contains null or not, it is't really a problem. Note also that since we used unsafeSubscribe in onConnect, there shouldn't be any call to the SourceSubscriber.unsubscribe coming from upstream and causing trouble if the source terminated normally and the disconnection strategy happen to be SEND_ERROR.

Connection.drain

This is unquestionably the heart of the operator and the most complicated logic due to the effects of concurrently changing values it has to rely on. I'll explain it in piece by piece:

First, it contains a familiar drain loop with wip counter and missed count:

void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }
    
    int missed = 1;
    
    for (;;) {

        if (checkTerminated(done, queue.isEmpty())) {
            return;
        }

        // implement rest
       
        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

Nothing fancy yet. The wip counter doubles as the serialization entry point on a 0 - 1 transition and a missed counter above that.

If inside the loop, the first thing to do is to check for a terminal condition via checkTerminated (explained later). It checks for the terminal events and disconnected state and acts accordingly. This is done before the upcoming request coordination since terminal events are not subject to backpressure management and can be emitted before any child Subscriber requests anything.

The next step is to perform request coordination. Since we set out to do a lockstep coordination, we have to ask all known child subscribers for their current requested amount and figure out the minimum amount everybody can receive. Note that this can be zero.


        //... checkTerminated call

        PublishProducer<T>[] a = subscribers.get();
        
        int n = a.length;
        long minRequested = Long.MAX_VALUE;
        
        for (PublishProducer<T> pp : a) {
            if (!pp.isUnsubscribed()) {
                minRequested = Math.min(minRequested, pp.requested.get());
            }
        }

        // ... missed decrementing

At this point, it is possible n is zero. If there are no subscribers, we set out to "slowly drip away" the available values:


        // ... minRequested calculation

        if (n == 0) {
            if (queue.poll() != null) {
                parent.requestMore(1);
            }
        } else {
            // implement rest           
        }

        // ... missed decrementing

We have to check if the queue is non empty and consume a value with a single poll() then we ask for replenishment. Note that the "slowness" depends on the speed of the upstream Observable. If one decides to do nothing if there are no subscribers, the if statement can be simplified to if (n != 0) { } but should not be removed!

If we know there are any subscribers and we know the minimum requested amount, we can try draining our queue and emit that amount to everybody.


            // if n != 0 branch

            if (checkTerminated(done, queue.isEmpty())) {   // (1)
                return;
            }

            long e = 0L;
            while (minRequested != 0) {

                boolean d = done;
                T v = queue.poll();
                
                if (checkTerminated(d, v == null)) {        // (2)
                    return;
                }
                
                if (v == null) {
                    break;
                }

                // final detail to implement
                
                minRequested--;                             // (3)
                e++;
            }
            
            if (e != 0L) {                                  // (4)
                parent.requestMore(e);
            }
        
        // end of n != branch

This should also look familiar. We check the  terminal conditions again (1) (optional if you want to be eager). Next, we loop until the minRequested is zero or the queue becomes empty. Inside the loop we do the usual termination checks (2) and emission accounting (3). After the loop, if there were emissions, we ask for replenishment from the SourceSubscriber instance (4).

Lastly, the final piece of the drain method is the publication of each value to all subscribers:


                // ... v == null check

                for (PublishProducer<T> pp : a) {
                    pp.actual.onNext(v);
                    if (pp.requested.get() != Long.MAX_VALUE) {
                        pp.requested.decrementAndGet();
                    }
                }

                // ... minRequested--

For each of the PublishProducer (i.e., child Subscriber), we emit the value and decrement the requested amount if not Long.MAX_VALUE (i.e., unbounded child Subscriber).

Wasn't that painful, was it?


Connection.checkTerminated

The checkTerminated method has more things to do since it has to deliver the terminal events to all Subscribers while making sure new Subscribers don't succeed within the add method.


boolean checkTerminated(boolean done, boolean empty) {    // (1)
    if (disconnected) {                                   // (2)
        subscribers.set(TERMINATED);
        queue.clear();
        return true;
    }
    if (done) {
        Throwable e = error.get();
        if (e != null) {
            state.replaceConnection(this);                // (3)
            queue.clear();

            PublishProducer<T>[] a = 
                subscribers.getAndSet(TERMINATED);        // (4)
            
            for (PublishProducer<T> pp : a) {             // (5)
                if (!pp.isUnsubscribed()) {
                    pp.actual.onError(e);
                }
            }
            
            
            return true;
        } else
        if (empty) {
            state.replaceConnection(this);                // (6)

            PublishProducer<T>[] a = 
                subscribers.getAndSet(TERMINATED);
            
            for (PublishProducer<T> pp : a) {
                if (!pp.isUnsubscribed()) {
                    pp.actual.onCompleted();
                }
            }
            
            return true;
        }
    }
    return false;
}

It works as follows:


  1. The method takes only a done and an empty indicator but not any individual Subscriber or the array of known subscribers.
  2. Since the disconnected flag is set only if the disconnection strategy was NO_EVENT, we can't do much but just set in the TERMINATED indicator array. Anybody unlucky enough still subscribed won't get any further events.
  3. If the done flag is true and there is an error we first replace the current connection with a fresh one (within the state) so newcommers won't try to subscribe to a terminated connection. 
  4. After clearing the queue for any normal values, we swap in the TERMINATED indicator array so ...
  5. ... anybody who got in can now receive its terminal event and the drain loop will quit.
  6. The same logic applies in the case when the upstream has completed normally and the queue has become empty.

Testing it out

Finally, we reached the end of one of the most complicated operators in history of RxJava. Now let's reward us via a small unit test to see if the backpressure and the disconnection stategy really works:


Observable<Integer> source = Observable.range(1, 10);

TestSubscriber<Integer> ts = TestSubscriber.create(5);

PublishConnectableObservable<Integer> o = createWith(
    source, DisconnectStrategy.SEND_ERROR);

o.subscribe(ts);

Subscription s = o.connect();

s.unsubscribe();

System.out.println(ts.getOnNextEvents());
ts.assertValues(1, 2, 3, 4, 5);
ts.assertNotCompleted();
ts.assertError(CancellationException.class);

It should print [1, 2, 3, 4, 5] to the console and quit without any AssertionErrors. Neat, isn't it?

Conclusion

In this lenghtly and brain-stretching blog post, I've explained the requirements and problems around ConnectableObservables that want to do request coordination between its child Subscribers and its upstream Observable. I then showed an implementation of a publish() like ConnectableObservable which features disconnection strategy to avoid hanging its child Subscribers.

This is, however, not the most complicated operator in RxJava. It isn't replay(), even though the bounded version is a bit more complicated than the PublishConnectableObservable (but only due to the boundary management). It is not the most commonly used operator either and in fact, that is simpler due to fewer state-clashing. No, the most complicated operator to day has so intertwined request coordination that even I'm not sure it is possible to write a buffer-bounded version of it.

But enough of mysterious foreshadowing! In the next part, I'm going to detail what it takes to implement a replay()-like ConnectableObservable.