2015. október 20., kedd

Operator internals: AutoConnect


The operator autoConnect is a member of the ConnectableObservable class and allows triggering the connection to the underlying ConnectableObservable once the specified amount of subscribers have arrived. The operator returns a plain Observable and as such can be more easily included in a chain of operators.

There are two reasons why this operator exists. First, many wanted to connect to a ConnectableObservable only if a given number of subscribers have subscribed to it which, before that, was difficult to achieve due to the lack of confinement. The second reason was that another operator, cache, didn't support advanced retention policies such as size and/or time bounds and it was somewhat tedious to achieve the same first-subscriber triggered connection as cache does.

To sketch its implementation, one needs an AtomicInteger to count each subscriber in an OnSubscribe callback and once the count reaches the desired amount, the call to connect() can happen.

There is, however, a small complication: the synchronous unsubscription support in ConnectableObservable. By applying plain autoConnect, one loses the means to unsubscribe an ongoing stream, similar to how cache behaves. The resolution is to take a callback and hand it to the connect() method.

Implementation details

The operator is not involved in request management at all, therefore, the implementation in 1.x and the two implementation in 2.x (the other is on the NbpConnectableObservable) looks essentially the same.

In fact, it is so short I'm going to repeat it here:

public Observable<T> autoConnect(int numConnections,
        Action1<Subscription> connection) {               // (1)
    if (numConnection == 0) {
        connect(connection);                              // (2)
        return this;
    AtomicInteger count = new AtomicInteger();
    return create(s -> {
        unsafeSubscribe(s);                               // (3)
        if (count.incrementAndGet() == numConnections) {  // (4)

Let's discuss the interesting points:

  1. RxJava has two extra overloads of this method, one that defaults to 1 required connection and the other asks for the number of connections. Both ignore the connection callback.
  2. If the number of connection is zero, we interpret it as an immediate connection. In this case, we don't have to do any kind of wrapping and just return the ConnectableObservable instance as is.
  3. If the number of connection is non-zero, we have to capture the subscription attempts and do extra work once the number of subscribers reached the required amount. Before we even test for that, we subscribe the incoming Subscriber to the underlying ConnectableObservable. This is necessary to happen first because if the numConnections is 1, the connection may drain the underlying sequence and the Subscriber may not receive any values at all.
  4. Once the subscriber count reached the required amount, we trigger a connection (that will call back the connection callback supplied originally.
At this point, you might think what happens if numConnections is 2, a Subscriber subscribes then unsubscribes immediately. Should the next Subscriber really trigger the connection? It depends on your requirements. The autoConnect operator, clearly, doesn't do this (a decrementAndGet() somewhere would indicate this). 

One reason for this is that originally, the operator was meant to be a simple replacement for using refCount() or share() in certain situations.


The operator autoConnect is among the simplest operators there are, 1 / 10, and thus has clear and simple feature set.

2 megjegyzés:

  1. Is there a way to disconnect the Observable after we call autoConnect?

    1. This is why there is an overload in RxJava with Action1 connection / Consumer where you can get and store the object representing the established connection and disconnect via unsubscribe/dispose.