2015. május 27., szerda

Pitfalls of operator implementations (part 3)

Introduction

In this series, I put a spotlight onto some common and some less common yet sneaky pitfalls of operator implementations. Now that we know more about producers, subscription-containers and schedulers, let's see some more pitfalls.

#9: Subscribing twice

Some operators, especially those built upon OnSubscribe may take their Subscribers and subscribe them to another Observable. An example for such an operator is defer().

Let's assume you want to create an operator which calls an action callback before it subscribes the incoming Subscriber to the real Observable:

public final class OnSubscribeRunAction<T> 
implements OnSubscribe<T> {
    final Observable actual;
    final Action0 action;
    public OnSubscribeRunAction(Observable actual, Action0 action) {
        this.action = action;
        this.actual = actual;
    }
    @Override
    public void call(Subscriber child) {
        try {
            action.call();
        } catch (Throwable e) {
            child.onError(e);
            return;
        }
        actual.unsafeSubscribe(child);
    }
}

Observable<Integer> source = Observable.create(
    new OnSubscribeRunAction<>(Observable.range(1, 3),
() -> {
    System.out.println("Subscribing!");
}));

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
    @Override
    public void onStart() {
        Thread t = new Thread(() -> {
            System.out.println("Starting helper thread "
                + Thread.currentThread());
        });
        t.start();
    }
};
source.unsafeSubscribe(ts);

If we run the example code, we see that onStart is called twice! The problem is that how the backpressure-related logic is designed in RxJava: whenever a child is subscribed, its onStart() method is called which allows the client to get some code executed before the first onNext value. Usually, this is where the initial request amount is issued or perhaps a GUI window associated with the subscriber is open.

Now in regular end-subscribers, this rarely comes up because the subscribe() method wraps them into a SafeSubscriber which doesn't forward its onStart method. However, when dealing with one another's operators, unsafeSubscribe is very common and the onStart ends up being called multiple times.

The resolution is to wrap the child subscriber with another subscriber in the operator that doesn't forward the onStart method:

        
        // ...
        try {
            action.call();
        } catch (Throwable e) {
            child.onError(e);
            return;
        }
        actual.unsafeSubscribe(new Subscriber<T>(child) {
            @Override
            public void onNext(T t) {
                child.onNext(t);
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                child.onCompleted();
            }
        });
        // ...


#10: Leaking scheduler workers

Let's say instead of performing some arbitrary action on subscription immediately, one would like to delay its execution by some time. If the sequence doesn't complete in time, one can perform some mitigating actions (i.e., showing a work in progress dialog for example).

public final class OnSubscribeRunActionDelayed<T>
implements OnSubscribe<T> {
    final Observable actual;
    final Action0 action;
    final long delay;
    final TimeUnit unit;
    final Scheduler scheduler;
    public OnSubscribeRunActionDelayed(Observable actual, 
            Action0 action, long delay, 
            TimeUnit unit, Scheduler scheduler) {
        this.action = action;
        this.actual = actual;
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
    }
    @Override
    public void call(Subscriber<? super T> child) {
        SerializedSubscriber<T> s = 
            new SerializedSubscriber<>(child);
        
        Worker w = scheduler.createWorker();                 // (1)
        
        Subscription cancel = w.schedule(() -> {
            try {
                action.call();
            } catch (Throwable e) {
                s.onError(e);
            }
        }, delay, unit);
        
        actual
        .doOnCompleted(cancel::unsubscribe)
        .unsafeSubscribe(s);
    }
}

Observable<Integer> source = Observable.create(
    new OnSubscribeRunActionDelayed<>(Observable
        .just(1).delay(1, TimeUnit.SECONDS),
() -> {
    System.out.println("Sorry, it takes too long...");
}, 500, TimeUnit.MILLISECONDS, Schedulers.io()));

Subscription s = source.subscribe(System.out::println);

Thread.sleep(250);

s.unsubscribe();

Thread.sleep(1000);

source.subscribe(System.out::println);

Thread.sleep(1500);

for (Thread t : Thread.getAllStackTraces().keySet()) {
    if (t.getName().startsWith("RxCached")) {
            System.out.println(t);
        }
    }
}
Again, running the example gives unwanted results: the excuse message is printed even if the first subscription was cancelled and when we dump the threads at the end, we'll see two RxCachedThreadSchedulers but clearly only one should be there due to reusability.

The problem is that the worker and the schedule token is not participating in unsubscription properly: even if the actual Observable is fast, only the work is unsubscribed but the worker is not, and thus it is never returned to the cache-pool.

The bug is sneaky because Schedulers.computation() and Schedulers.trampoline() are not sensitive to scheduler leaks: the former arbitrates between a fixed set of actual workers and the latter doesn't retain any threading resources and can be cleanly garbage collected. Schedulers.io(), Schedulers.from() and newThread(), on the other hand, hold onto a thread which can't be reused / shutdown unless the worker is unsubscribed.

The solution is to add the worker and the cancel token to the child subscriber as resource so they are unsubscribed if the child unsubscribes, however, since there is going to be a single scheduled task, unsubscribing the worker will unsubscribe 'all' of the pending and running tasks thus there is no need to add the individual cancel token to the child subscriber; the adding the worker is enough:

    // ...
    SerializedSubscriber<T> s = new SerializedSubscriber<>(child);
        
    Worker w = scheduler.createWorker(); 
    child.add(w);
        
    w.schedule(() -> {
        try {
            action.call();
        } catch (Throwable e) {
            s.onError(e);
        }
    }, delay, unit);
        
    actual
    .doOnCompleted(w::unsubscribe)
    .unsafeSubscribe(s);
    // ...

#11: Adding the worker to the subscriber

Let's assume we need an operator that when receives a value, it emits an observable that will emit that single value after some delay.

public final class ValueDelayer<T> 
implements Operator<Observable<T>, T> {
    final Scheduler scheduler;
    final long delay;
    final TimeUnit unit;
    
    public ValueDelayer(long delay, 
            TimeUnit unit, Scheduler scheduler) {
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super Observable<T>> child) {
        Worker w = scheduler.createWorker();
        child.add(w);
        
        Subscriber<T> parent = new Subscriber<T>(child, false) {
            @Override
            public void onNext(T t) {
                BufferUntilSubscriber<T> bus = 
                        BufferUntilSubscriber.create();
                
                w.schedule(() -> {
                    bus.onNext(t);
                    bus.onCompleted();
                }, delay, unit);
                
                child.onNext(bus);
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                child.onCompleted();
            }
        };
        
        child.add(parent);
        
        return parent;
    }
}

Observable.range(1, 3)
    .lift(new ValueDelayer<>(1, TimeUnit.SECONDS, 
         Schedulers.computation()))
    .take(1)
    .doOnNext(v -> v.subscribe(System.out::println))
    .subscribe();
        
Thread.sleep(1500);

Strangely, the example prints nothing but we expect it to print 1 after a second. The problem is with the take(1) unsubscribing the upstream after receiving the first 'window' which then cancels the scheduled emission of the value itself.

Resolving the problem can take many shapes and actually depends on the broader context. Clearly, we need to unsubscribe the worker yet allow the consumption of the inner observable sequence.

One way is to keep an atomic counter to count the number of unobserved inner Observables and unsubscribe the worker if it reaches zero. In addition, this solution requires the inner Observables to be always consumed.

        // ...
        Worker w = scheduler.createWorker();
        
        final AtomicBoolean once = new AtomicBoolean();
        final AtomicInteger wip = new AtomicInteger(1);           // (1)
        
        Subscriber<T> parent = new Subscriber<T>(child, false) {
            @Override
            public void onNext(T t) {
                
                if (wip.getAndIncrement() == 0) {                 // (2)
                    wip.set(0);
                    return;
                }

                BufferUntilSubscriber<T> bus = 
                        BufferUntilSubscriber.create();

                w.schedule(() -> {
                    try {
                        bus.onNext(t);
                        bus.onCompleted();
                    } finally {
                        release();                                // (3)
                    }
                }, delay, unit);
                
                child.onNext(bus);
                if (child.isUnsubscribed()) {
                    if (once.compareAndSet(false, true)) {        // (4)
                        release();
                    }
                }
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                if (once.compareAndSet(false, true)) {
                    release();                                    // (5)
                }
                child.onCompleted();
            }
            void release() {
                if (wip.decrementAndGet() == 0) {
                    w.unsubscribe();
                }
            }
        };
        parent.add(Subscriptions.create(() -> {                   // (6)
            if (once.compareAndSet(false, true)) {
                if (wip.decrementAndGet() == 0) {
                    w.unsubscribe();
                }
            }
        }));
        
        child.add(parent);
        
        return parent;
    }
    // ...

The solution involves several notable changes:

  1. We need an atomic integer and an atomic boolean. The former one counts the unconsumed inner Observables as well as the active main upstream source of Ts. Since the upstream can end at many locations and perhaps 'multiple times' (i.e., terminating just after the onNext but the upstream still sends an onCompleted which then triggers an unsubscription coming from downstream). Since the upstream should count as 1, we need to use the once to make sure the upstream's single decrement happens only once.
  2. We increment the number of 'open windows' by one before even attempting to schedule its task. However, since (6) can asynchronously decrement the wip value to zero, an increment from 0 to 1 indicates an onNext slipped through in which case the worker is already unsubscribed and the child would receive an Observable that can't ever emit.
  3. Once the individual inner Observables could emit their values, we release one 'window'.
  4. We eagerly check if the child has just unsubscribed after the emission of the inner 'window'. If so, we try to release the upstream once.
  5. If the downstream didn't unsubscribe, the onCompleted has to try and release the upstream once too.
  6. But since unsubscription can happen at any time, even between event emissions, we still have to try and release the upstream once.

Conclusion

One can argue that these pitfalls are corner cases but as an operator developer, especially if planning to submit a PR, has to look out for such problems.

In this post, we've looked at how one can accidentally start its subscribers multiple times and shown two opposite cases where adding or not-adding a worker to the child subscriber can cause problems.

Schedulers (part 1)

Introduction

Schedulers are the key to asynchronous and concurrent computations in RxJava. Although many standard schedulers exist and you can wrap an Executor into a scheduler, it is worth understanding how schedulers can be built from scratch in order to utilize other forms of concurrency sources, such as GUI event loops of frameworks.


The Scheduler API

If you are familiar with Rx.NET's IScheduler API, you can discover that RxJava's Scheduler API is a bit different. This difference comes from how each library tried to solve the recursive scheduling problem. Rx.NET chose to inject the actual scheduler into the action being scheduled.

RxJava chose to mirror how Iterable/Iterator pattern is established and came up with the pair of Scheduler/Worker. RxJava's scheduler doesn't do any scheduling but allows the creation of a Worker, which allows scheduling directly and recursively and is safe to close-over in the submitted action if necessary. In addition, the Worker itself is a Subscription and can be unsubscribed which triggers a mass-unsubscription and prevents any further tasks getting scheduled (best effort I might add). This comes in handy when the scheduling is used in operators (for example, buffer with time) and the downstream unsubscribes the whole chain, cancelling the periodic buffer-emission task once and for all.

The Scheduler/Worker API has to meet some requirements:

  1. All methods should be thread-safe.
  2. Workers should make sure undelayed, sequentially scheduled tasks execute in FIFO order.
  3. Workers must make best effort to cancel outstanding tasks when unsubscribed.
  4. Unsubscription of a Worker should not affect other Worker instances of the same Scheduler.

These requirements may seem harsh, but allows reasoning about a dataflow's concurrency much easier, similar to how the sequential requirement of the Observer methods allows the same reasoning.

In addition to the requirements, there are a few nice-to-have properties associated with the API:

  1. Tasks scheduled on a Worker should try to avoid hopping threads. (Thread locality improves performance.)
  2. Delayed tasks scheduled sequentially and with the same delay amount should keep their FIFO order (with concurrent scheduling, the ordering bets are off).
With all these requirements, a conservative implementation is likely to use a single-threaded thread pool backing each individual worker and this is how the standard RxJava schedulers are implemented: the underlying ScheduledExecutorService gives all of these guarantees.



Implementing a custom Scheduler

Let's assume we need to write a custom scheduler with the following properties: (1) it should only have a single worker thread and (2) a thread-local context value needs to be 'transferred over' and made available to the executing task via the same thread-local access mechanism.

Clearly, if we'd have only (1) as the property, one could just wrap a single-threaded executor with Schedulers.from(), but property (2) requires some additional work to be performed when a task is prepared and executed.

In order to accomplish the requirements, we may reuse some of RxJava's own scheduler primitives: namely ScheduledAction and NewThreadWorker. (Note however, that these are internal classes and are subject to changes without warnings. Here, I'm using them to reduce the clutter and allow me to concentrate on the important parts of creating the scheduler.)

As usual, we start with the class skeleton:


public final class ContextAwareScheduler 
extends Scheduler {
    
    public static final ContextAwareScheduler INSTANCE = 
            new ContextAwareScheduler();                       // (1)
    
    final NewThreadWorker worker;
    
    private ContextAwareScheduler() {
        this.worker = new NewThreadWorker(
                new RxThreadFactory("ContextAwareScheduler")); // (2)
    }
    @Override
    public Worker createWorker() {
        return new ContextAwareWorker(worker);                 // (3)
    }
    
    static final class ContextAwareWorker extends Worker {

        final CompositeSubscription tracking;                  // (4)
        final NewThreadWorker worker;

        public ContextAwareWorker(NewThreadWorker worker) {
            this.worker = worker;
            this.tracking = new CompositeSubscription();
        }
        @Override
        public Subscription schedule(Action0 action) {
            // implement
        }
        @Override
        public Subscription schedule(Action0 action, 
                long delayTime, TimeUnit unit) {
            // implement
        }
        @Override
        public boolean isUnsubscribed() {
            return tracking.isUnsubscribed();                  // (5)
        }
        @Override
        public void unsubscribe() {
            tracking.unsubscribe();
        }
    }
}

Our ContextAwareScheduler skeleton may look scary, but it consists of straightforward components:

  1. Since we want a single global thread, we can't allow multiple instances of the Scheduler to exist and thus we use a static instance variable for it.
  2. The scheduler will delegate most of its work to a single underlying worker. We reuse the NewThreadWorker class and the RxThreadFactory to get a single daemon-thread backed worker instance.
  3. Instead of handing out the single worker, we need to 'split' it among usages to conform to requirement #4. Otherwise, if the worker is unsubscribed, everyone's worker is now unsubscribed and useless from then on.
  4. We still need to make sure requirement #3 is met, therefore, we need to track tasks submitted to each particular worker.
  5. The tracking structure also gives the means to check for and issue unsubscriptions.
Next, we need the aforementioned thread-local context:

public final class ContextManager {
    static final ThreadLocal<Object> ctx = new ThreadLocal<>();
    
    private ContextManager() {
        throw new IllegalStateException();
    }
    
    public static Object get() {
        return ctx.get();
    }
    public static void set(Object context) {
        ctx.set(context);
    }
}

The ContextManager just wraps around a static ThreadLocal instance. In practice, you'd want to replace the Object with some meaningful type.

Now back to the implementation of the schedule() methods:


    // ...
    @Override
    public Subscription schedule(Action0 action) {
        return schedule(action, 0, null);               // (1)
    }
    @Override
    public Subscription schedule(Action0 action, 
            long delayTime, TimeUnit unit) {

        if (isUnsubscribed()) {                         // (2)
            return Subscriptions.unsubscribed();
        }
        
        Object context = ContextManager.get();          // (3)
        Action0 a = () -> {
            ContextManager.set(context);                // (4)
            action.call();
        };
        
        return worker.scheduleActual(a, 
            delayTime, unit, tracking);                 // (5)
    }
    // ...

Just look at the simplicity!

  1. We delegate the undelayed scheduling as if the initial delay is zero. All underlying structure will interpret this zero correctly and perform the required undelayed FIFO execution.
  2. In case the current worker is unsubscribed, we don't do anything and return a constant unsubscribed subscription. Note that unsubscribing always involves some small race window where tasks (or events) may slip through. This race is internally resolved in the scheduleActual; more on this below.
  3. We hold onto the current thread's context value and wrap the action into another action. 
  4. Inside, the context value is restored to that particular thread's local storage. Since the worker is single threaded and non-reentrant, the thread-local context can't be overwritten by the next scheduled task while a previous task is running.
  5. Finally, we delegate the wrapping action and the delay information to the underlying NewThreadWorker instance. By passing in the tracking composite, the action will be properly tracked and removed if the task completes, gets unsubscribed or the entire worker gets unsubscribed.


As mentioned in the explanation steps, step (2) is inherently racing with the unsubscription of the worker but we shouldn't leave tasks behind after unsubscription. This is where the unsubscription guarantee of a container comes into play. If we wrap the Future returned by the underlying thread pool into a Subscription, we can safely add it to the tracking composite which will either atomically take a hold onto it or unsubscribe it immediately.

Let's try it out:

Worker w = INSTANCE.createWorker();

CountDownLatch cdl = new CountDownLatch(1);

ContextManager.set(1);
w.schedule(() -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
});

ContextManager.set(2);
w.schedule(() -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
    cdl.countDown();
});

cdl.await();

ContextManager.set(3);

Observable.timer(500, TimeUnit.MILLISECONDS, INSTANCE)
.doOnNext(v -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
}).toBlocking().first();

w.unsubscribe();

Conclusion

Schedulers give the opportunity to specify where and likely when to execute tasks related to the operation of an Observable chain. Built-in schedulers should cover most of the usual needs towards parametric concurrency, but some scenarios require the use and building of custom schedulers. In this blog post, I've shown how one can, with the major help from existing RxJava classes, build his/her own custom scheduler with a custom behavior.

In the next part, we are going in deeper and look at the ScheduledAction class and see how its concepts can be utilized in case more control needs to be enforced when scheduling a task, for example, to work with or against thread interruptions.

2015. május 26., kedd

Operator concurrency primitives: subscription-containers (part 3 - final)

Introduction


In this final part about subscription-containers, I'm going to demonstrate an array-backed container based on the copy-on-write approach and atomics.

Why is this type of container so important? My answer is another question: if the contained subscriptions were Subscribers, what would you do with an array of Subscribers?

You could implement operators requiring multicast to child subscribers with it and maintain the thread-safety and termination guarantees, similar to how RxJava's Subjects handle their subscribers and more importantly, how the - recently rewritten - publish() does it too.


Array-backed container

Let's build such a container with the following requirements: ability to add and remove some kind of subscriptions (i.e., Subscribers), ability to get the current contents, ability to 'unsubscribe' the container without unsubscribing the contents and the ability to tell if an add succeeded or not.

The class skeleton, with the trivial implementations already filled in, looks like this:


@SuppressWarnings({"rawtypes", "unchecked"})                 // (1)
public class SubscriberContainer<T> {
    static final Subscriber[] EMPTY = new Subscriber[0];     // (2)
    static final Subscriber[] TERMINATE = new Subscriber[0];
    
    final AtomicReference<Subscriber[]> array
        = new AtomicReference<>(EMPTY);                      // (3)
    
    public Subscriber<T>[] get() {                           // (4)
        return array.get();
    }

    public boolean add(Subscriber<T> s) {                    // (5)
        // implement
    }

    public boolean remove(Subscriber<T> s) {                 // (6)
        // implement
    }

    public Subscriber<T>[] getAndTerminate() {               // (7)
        return array.getAndSet(TERMINATE);
    }

    public boolean isTerminated() {                          // (8)
        return get() == TERMINATED;
    }
}

The structure consists of elements as follows:

  1. Java doesn't allow generic base types for arrays and doesn't like certain type conversions so we are forced to suppress the warnings regarding raw types and converting those raw values back to parametric types. Generally, the implementation has to be safe but the user of the class will remain typesafe.
  2. We use a constant array for indicating the empty state and the terminated state.
  3. We store the current array in an AtomicReference instance, but if you know the class won't be extended further, you can just extend AtomicReference directly.
  4. The get() method returns the current array of contained items. For performance reasons, one should use this array in read-only manner (otherwise, defensive copies are required on each invocation).
  5. The add() method takes a properly-typed Subscriber and returns true if it could be added or false if the container was terminated.
  6. The remove() method tries to remove the given subscriber instance and returns true if successful.
  7. Instead of a plain unsubscribe() method, we do a handy trick: the current array is replaced with the terminal indicator and return the previous array. It becomes useful when an atomic termination and post-termination actions are needed.
  8. Since a regular empty array can't be told apart from the terminated state, a method is needed which checks for this terminal state explicitly.
The add() method will be the simpler one:

    // ...
    public boolean add(Subscriber<T> s) {
        for (;;) {
            Subscriber[] current = array.get();
            if (current == TERMINATED) {                  // (1)
                return false;
            }
            int n = current.length;
            Subscriber[] next = new Subscriber[n + 1];
            System.arraycopy(current, 0, next, 0, n);     // (2)
            next[n] = s;
            if (array.compareAndSet(current, next)) {     // (3)
                return true;
            }
        }
    }
    // ...

Here we have a classical CAS loop:

  1. If the container contains the terminated token, the method returns with false and the subscriber is not added to the container. The caller can then decide what to do with said subscriber (i.e., send onCompleted event to it).
  2. A copy of the current contents are made into an array with length + 1 and the subscriber is assigned to the very end.
  3. The CAS will try to 'commit' the changes and the method returns true. Otherwise, a new attempt is made.

Lastly, the remove() looks like as follows:

    // ...
    public boolean remove(Subscriber<T> s) {
        for (;;) {
            Subscriber[] current = array.get();
            if (current == EMPTY 
                    || current == TERMINATED) {             // (1)
                return false;
            }
            int n = current.length;
            int j = -1;
            for (int i = 0; i < n; i++) {                   // (2)
                Subscriber e = current[i];
                if (e.equals(s)) {
                    j = i;
                    break;
                }
                i++;
            }
            if (j < 0) {                                    // (3)
                return false;
            }
            Subscriber[] next;
            if (n == 1) {                                   // (4)
                next = EMPTY;
            } else {
                next = new Subscriber[n - 1];
                System.arraycopy(current, 0, next, 0, j);
                System.arraycopy(current, j + 1, 
                    next, j, n - j - 1);                    // (5)
            }
            if (array.compareAndSet(current, next)) {       // (6)
                return true;
            }
        }
    }

Although a bit complicated, the method's behavior is straightforward:

  1. In case the current array is empty or the container is already terminated, it can't contain any subscribers and thus the method quits immediately.
  2. Otherwise, we search for the first occurrence of the given subscriber and hold onto its index in j. By scanning first instead of doing an on-the-fly filter-copy, we can save some overhead due to the card-marking associated with each reference store, required by most GCs.
  3. If j remained negative, the subscriber is not amongst the others in the array and the method returns false.
  4. In case the array contains a single value, there is no need to create an empty array but we can reuse the constant (since empty arrays are essentially stateless).
  5. If the array contains multiple elements, a new shorter array is created and values around the found location is then copied over to it.
  6. Finally, the CAS will swap in the new array and returns true indicating the subscriber was successfully removed.

Such a container is quite often used in multicast-like operators but these operators rarely encounter more than half-dozen subscribers during their lifecycle and as such, the frequent allocation of arrays has less impact.

If the allocation rate is a problem in your scenario, it is possible to change the above logic to use synchronized block and some type of list or set to store the subscriptions, but note that the get() method, quite frequently used when dispatching events, can no longer be implemented in a wait-free manner. The get() method will, most likely, require the use of a synchronized block and making defensive copies for every invocation; again, a tradeoff one needs to consider carefully.


Conclusion

In this mini-series, I talked about various kinds of standard subscription containers and shown how to implement blocking and lock-free containers. In addition, I've described an array-backed lock-free container with peculiar properties that come in handy when implementing multicast-like operators.

If there was only RxJava 1.x, the operator concurrency primitives series could end here. However, the reactive-streams has been recently finalized and is expected to form the basis for RxJava 2.0 and that is going to be a complete rewrite; it is unavoidable.

Does this mean the things learned so far are useless? Yes and no. The concepts will be quite relevant in fact, only the class structures need to change somewhat to match the reactive-streams specification and requirements.

Instead of jumping right into RxJava 2.0 constructs, let's take a break and look at some other advanced RxJava topics: schedulers

2015. május 22., péntek

Operator concurrency primitives: subscription-containers (part 2)

Introduction


In this blog post, I'm going to implement two, lock-free versions of the TwoSubscriptions container from the previous post. Although they will be functionally equivalent, the implementation will reflect two different philosophies regarding how one can check for their unsubscription status and unsubscribe them.


Using boolean isUnsubscribed in the state


A simple way of implementing a lock-free data structure is to do a so-called copy-on-write operation on every mutation, which involves an immutable state and a CAS loop. With our TwoSubscriptions, we will capture the state of two distinct Subscriptions into a composite class:

    static final class State {
        final Subscription s1;
        final Subscription s2;
        final boolean isUnsubscribed;
        public State(Subscription s1, 
                Subscription s2, 
                boolean isUnsubscribed) {
            this.s1 = s1;
            this.s2 = s2;
            this.isUnsubscribed = isUnsubscribed;
            
        }
    }
    // ...

With this State inner class, whenever the state needs to change, we will create a new instance, copy over the relevant values and use a CAS loop to achieve atomicity. Now let's see our new lock-free container's class structure:

public final class TwoSubscribersLockFree1 
implements Subscription {
    static final class State { 
        // ...
    }

    static final State EMPTY = 
        new State(null, null, false);                  // (1)

    static final State UNSUBSCRIBED = 
        new State(null, null, true);                   // (2)
    
    final AtomicReference<State> state = 
        new AtomicReference<>(EMPTY);                  // (3)

    public void set(boolean first, Subscription s) {
        // implement
    }
    
    @Override
    public void unsubscribe() {
        // implement
    }

    @Override
    public boolean isUnsubscribed() {
        // implement
    }
}

First, since the initial and terminal states are essentially constants, they are declared as static final instances with the difference that UNSUBSCRIBED.isUnsubscribed == true (1) (2). Since the state needs to be changed atomically, we also need an AtomicReference to hold the State instance (3) which we initialize to the empty (constant) state.

With the given skeleton, the implementation of set() looks as follows:

    public void set(boolean first, Subscription s) {
        for (;;) {
            State current = state.get();                    // (1)
            if (current.isUnsubscribed) {                   // (2)
                s.unsubscribe();
                return;
            }
            State next;
            Subscription old;
            if (first) {
                next = new State(s, current.s2, false);     // (3)
                old = current.s1;                           // (4)
            } else {
                next = new State(current.s1, s, false);
                old = current.s2;
            }
            if (state.compareAndSet(current, next)) {       // (5)
                if (old != null) {
                    old.unsubscribe();                      // (6)
                }
                return;
            }
        }
    }

and works as follows:

  1. The current state value is read.
  2. If the current state is unsubscribed, the terminal state of this container is was reached, we unsubscribe the parameter and quit.
  3. Otherwise, we'll create a new state based on the old one, replace the appropriate subscription with the provided one.
  4. Since the subscription needs to be unsubscribed on replacement, we save its instance locally.
  5. The CAS operation will atomically swap in the new, updated state or we perform a new iteration in case a concurrent modification happened to the state.
  6. With a successful CAS, the original subscription (if any) is unsubscribed and the loop is quit.
The implementation of isUnsubscribed() is straightforward:

    // ...
    @Override
    public boolean isUnsubscribed() {
        return state.get().isUnsubscribed;
    }
    // ...

Finally, let's see how one can implement the unsubscribe() method.

    @Override
    public void unsubscribe() {
        State current = state.get();                        // (1)
        if (!current.isUnsubscribed) {                      // (2)
            current = state.getAndSet(UNSUBSCRIBED);        // (3)
            if (!current.isUnsubscribed) {                  // (4)
                List<Throwable> errors = null;              // (5)
                
                errors = unsubscribe(current.s1, errors);   // (6)
                errors = unsubscribe(current.s2, errors);
                
                Exceptions.throwIfAny(errors);              // (7)
            }
        }
    }

    private List<Throwable> unsubscribe(Subscription s,     // (8)
            List<Throwable> errors) {
        if (s != null) {
            try {
                s.unsubscribe();
            } catch (Throwable e) {
                if (errors == null) {
                    errors = new ArrayList<>();
                }
                errors.add(e);
            }
        }
        return errors;
    }
}

The method has several interesting steps:

  1. We retrieve the current state.
  2. If the current state is already unsubscribed, there is nothing to do and the method quits.
  3. Otherwise, we atomically exchange the current state with the constant terminal state.
  4. If the previous state was unsubscribed, the method can quit, otherwise, since the getAndSet is atomic there will be exactly one caller who transitions from a non-terminated state into the terminated state. There is no need for a CAS loop here and the unsubscription, so far, can be wait-free on platforms with intrinsified getAndSet.
  5. The possible exceptions are collected into an errors list.
  6. I've factored out the unsubscription and error collection into a method and it is called for each of the contained subscriptions.
  7. If any of the unsubscriptions threw, the exception(s) are rethrown.
  8. The convenience method of unsubscribing a subscription and updating the errors list if necessary.

Using the UNSUBSCRIBED state reference

If we think about it, since the terminal state is distinct from the others not just by the isUnsubscribed flag, but by having a unique constant reference. It is possible remove isUnsubscribed and compare against the UNSUBSCRIBED instance everywhere as necessary.

Therefore, we can simplify the State class in the new TwoSubscribersLockFree2 as follows:

public final class TwoSubscribersLockFree2 implements Subscription {
    static final class State {
        final Subscription s1;
        final Subscription s2;
        public State(Subscription s1, 
                Subscription s2) {
            this.s1 = s1;
            this.s2 = s2;
            
        }
    }

The isUnsubscribed field was removed from it and we have to change every former isUnsubscribed check:

    // ...
    static final State EMPTY = new State(null, null);         // (1) 
    static final State UNSUBSCRIBED = new State(null, null);
    
    final AtomicReference<tate> state
        = new AtomicReference<>(EMPTY);

    public void set(boolean first, Subscription s) {
        for (;;) {
            State current = state.get();
            if (current == UNSUBSCRIBED) {                    // (2)
                s.unsubscribe();
                return;
            }
            State next;
            Subscription old;
            if (first) {
                next = new State(s, current.s2);
                old = current.s1;
            } else {
                next = new State(current.s1, s);
                old = current.s2;
            }
            if (state.compareAndSet(current, next)) {
                if (old != null) {
                    old.unsubscribe();
                }
                return;
            }
        }
    }
    
    @Override
    public boolean isUnsubscribed() {
        return state.get() == UNSUBSCRIBED;                    // (3)
    }
    
    @Override
    public void unsubscribe() {
        State current = state.get();
        if (current != UNSUBSCRIBED) {                         // (4)
            current = state.getAndSet(UNSUBSCRIBED);
            if (current != UNSUBSCRIBED) {                     // (5)
                List<Throwable> errors = null;
                
                errors = unsubscribe(current.s1, errors);
                errors = unsubscribe(current.s2, errors);
                
                Exceptions.throwIfAny(errors);
            }
        }
    }
    // ...

The new constants no longer need a boolean flag (1) and places of current.isUnsubscribed are now replaced with current == UNSUBSCRIBED check (2, 3, 4, 5).

Given these two approaches, which one to choose? Benchmark and see it for yourself. Obviously, the first allocates more memory but the boolean check can be faster on certain platforms, whereas the second costs less in memory but reference comparison can be slower.

Generally though, using the class will increase the GC pressure as every modification triggers the allocation of a new state. It is possible to avoid it by performing per-subscription CAS loops, but the approach can get cumbersome as the number of subscription fields increases.

Conclusion

In this post, I've introduced two lock-free variants of the TwoSubscriptions container and explained their inner workings.

It is more likely one has to manage more than two subscriptions at a time, therefore, I'm going to demonstrate an array-based container with the very same underlying approaches in the next post.

2015. május 19., kedd

Operator concurrency primitives: subscription-containers (part 1)

Introduction

Writing complex operators is likely to invoke the use of subscription management and thus the various standard subscription-containers. 

It is worth reiterating that subscription-containers have a terminal state, reached by calling unsubscribe() on them, that will unsubscribe any attempt to add or replace the contained subscription with a new one. This may seem odd, but the following scenario will explain the reason behind this design decision.

Let's assume your operator schedules some work with a delay on a Scheduler.Worker instance. Concurrently, the chain is unsubscribed but the schedule() call hasn't returned yet with a Subscription that lets you cancel the schedule. If there wasn't the terminal state property, adding the returned Subscription to such container would never know to unsubscribe and now you have a memory leak. With the terminal state property, the attempt to set said Subscription on the container that is unsubscribed will immediately unsubscribe it and the resources are freed. This is most notoriously appearing with respect of Android where once your application has been told to pause, you don't want to have a background task hanging around deep in the operator chain.

Further things one should already know: 
  • all operations on a container should be thread-safe and atomic from the perspective of a concurrent unsubscription,
  • the unsubscribe() call itself must be idempotent (multiple calls should be no-ops) and
  • one should avoid calling unsubscribe() while holding a lock (especially if you have control over the lock-acquisition) to avoid any chance of a deadlock.

I'll briefly touch on these classes, their use in operators and their notable properties but the blog post will be about the concepts behind the containers so one can build custom containers that can support the complex lifecycle found in Subjects and in conversion operators such as publish().

Standard subscription containers


CompositeSubscription

The most frequently used container type is the CompositeSubscription. Any operator that has to deal with resources (Subscribers, wrapped Futures) that come and go is likely to use this class. The current implementation uses a synchronized block to achieve thread-safety and puts the subscription instances into a HashSet

(Sidenote: The reason for this structure instead of a copy-on-write non-blocking approach was that copy-on-write generates a lot of garbage and some particular usage pattern of RxJava at Netflix made them worry about GC.

One notable property of this composite is that remove() will unsubscribe the subscription it successfully removes from the container (which is sometimes unnecessary, for example, when one removes a wrapped Future from the tracking composite at the end of the task itself).

CompositeSubscription is well suited for operators where the resources come and go non-deterministically and its internal HashSet can shine with the O(1) add and remove complexity (example: merge()).

SerialSubscription

The SerialSubscription contains only a single Subscription at a time and replacing it with another Subscription will unsubscribe the previous Subscription. Internally, it uses the copy-on-write approach with an extra state object.

This type of container is employed in cases when one needs to track one resource at a time, such as the current Subscriber to a source in a concatMap() scenario.

MultipleAssignmentSubscription

The MultipleAssignmentSubscription is a variant of the SerialSubscription (unrelated class hierarchy though) where the replacement of the current contained Subscription doesn't unsubscribe that instance.

Its use is less frequent and needs some forethought because of the said behavior. One can use when the reference to the subscription can be safely 'lost' because an unsubscribe() call would only be a wasteful no-op. This container is used in the algorithms that perform periodic scheduling through the Scheduler.Worker instance by splitting it into independent delayed schedules. The result of the schedulePeriodically() will be a MultipleAssignmentSubscription that holds onto the current individual delayed schedule of the underlying task. Since the replacement happens at the very end of the repeated task, it is unnecessary and even unwanted to let itself unsubscribe().

SubscriptionList

The SubscriptionList is an internal class, similar to CompositeSubscription that uses a LinkedList instead of a HashSet, to speed up on scenarios where the subscriptions are just added to the container and never removed. Generally, one shouldn't rely on an internal class but if you'd want to submit a PR to RxJava, I'd expect you to use this class if appropriate (and is still available at that time). 

Lately, it has been retrofitted with a remove() method to support an optimization on the default computation() scheduler to speed up non-delayed task addition and removal because usually, the schedule of these task happens in the same order that they get executed and removed from the internal tracking structure. We end up removing the very first item from the LinkedList which has less overhead than a remove from a HashMap (according to our benchmarks).

This container appears in various places, most notable in the Subscriber itself, but the internal ScheduledAction uses it too to 'track the trackers'.

RefCountSubscription

I'd say RefCountSubscription is a remnant from the Rx.NET set of resource containers. It holds onto a single and unchangeable Subscription and 'hands out' derived subscriptions that when all of them get unsubscribed, the main subscription gets unsubscribed. We don't use it in RxJava for a time now the original Rx.NET RefCountDisposable works much better with their standard .NET resource management IDisposables.

(Sidenote: BooleanSubscription is not a container because it doesn't hold onto another Subscription but it is aimed at wrapping arbitrary actions to be executed on unsubscription.)

Implementing blocking containers

Let's assume you need a very specific container that can hold onto exactly two Subscribers which can be freely replaced (unsubscribing the earlier value in the process). Let's call the container TwoSubscribers and have the following class skeleton:

public final class TwoSubscribers 
implements Subscription {
    private volatile boolean isUnsubscribed;          // (1)
    
    Subscription s1;                                  // (2)
    Subscription s2;
    
    @Override
    public boolean isUnsubscribed() {
        return isUnsubscribed;                        // (3)
    }
    
    public void set(boolean first, Subscription s) {
        // implement
    }
    
    @Override
    public void unsubscribe() {
        // implement
    }
}

The class isn't complicated at this stage. We keep the unsubscription indicator as a volatile boolean value to avoid using a synchronized block unnecessarily (1)(3). The class needs to hold onto two subscriptions, which due to an 'external' synchronization, we have as plain instance variables (2).

The set() method takes a boolean argument to determine which of the two contained Subscription has to be replaced. The implementation is as follows:


    // ...
    public void set(boolean first, Subscription s) {
        if (!isUnsubscribed) {                       // (1)
            synchronized (this) {
                if (!isUnsubscribed) {               // (2)
                    Subscription temp;               // (3)
                    if (first) {
                        temp = s1;
                        s1 = s;
                    } else {
                        temp = s2;
                        s2 = s;
                    }
                    s = temp;                        // (4)
                }
            }
        }
        if (s != null) {                             // (5)
            s.unsubscribe();
        }
    }
    // ...
After all those complicated Producers, it looks quite simple:

  1. We eagerly check the unsubscription status of the container so if it is already unsubscribed, we can skip the synchronized block and just unsubscrbe the parameter directly.
  2. Otherwise, we double-check the unsubscription status and if the container is still not unsubscribed, we commence with the replacement.
  3. Because the subscription replaced should be unsubscribed we get the current subscription and replace it with the one from the parameter.
  4. We will reuse the parameter and let it store the previous subscription on the way out.
  5. We reach this point either if (1) or (2) fell through with the subscription parameter subscription intact or with s holding onto the previous subscription from one of the slots. Since the latter can be null, we call unsubscribe() after a null check.
What remains is the unsubscribe() method of the container itself:


    // ...
    @Override
    public void unsubscribe() {
        if (!isUnsubscribed) {                  // (1)
            Subscription one;                   // (2)
            Subscription two;
            synchronized (this) {
                if (isUnsubscribed) {           // (3)
                    return;
                }
                
                isUnsubscribed = true;          // (4)
                
                one = s1;                       // (5)
                two = s2;
                
                s1 = null;
                s2 = null;
            }
            
            List<Throwable> errors = null;      // (6)
            try {
                if (one != null) {
                    one.unsubscribe();          // (7)
                }
            } catch (Throwable t) {
                errors = new ArrayList<>(); // (8)
                errors.add(t);
            }
            try {
                if (two != null) {
                    two.unsubscribe();
                }
            } catch (Throwable t) {
                if (errors == null) {
                    errors = new ArrayList<>();
                }
                errors.add(t);
            }

            Exceptions.throwIfAny(errors);      // (9)
        }
    }
}

Unfortunately, the method doesn't look too elegant, but because of a good reason:

  1. We check if the container is already unsubscribed, in which case there is nothing else to do.
  2. If we find the container active, we will need to get the current subscriptions out because we can't unsubscribe them while holding a lock.
  3. In the synchronized block, we double-check the unsubscription status and quit if the container is already unsubscribed.
  4. We set the isUnsubscribed field as early as possible so concurrent set() calls can see it as soon as possible and not try to enter the synchronized block.
  5. We read the current values and replace them with null to avoid retaining anything.
  6. It is a good practice to be defensive about exceptions thrown from unsubscriptions and since we have 2, we need to make sure they both get unsubscribed but the exception(s) are still propagated.
  7. If the specific subscription is not null, we call unsubscribe() on it.
  8. In case of an exception, we dynamically allocate the errors list and add the Throwable to it.
  9. After doing the same logic with the second subscription, we invoke a helper method that will throw the accumulated exceptions if any or just do nothing. If both subscriptions threw, the throwIfAny will throw a single CompositeException capturing both.

Conclusion

In this relatively short post, I've recapped the existing subscription-container classes, including the underlying requirements of containers, the properties and use of each standard RxJava subscription-container. I've demonstrated, through a simple container how one can create a conformant container by using blocking operations.

In the next part, I'm going to create a non-blocking variant of the example TwoSubscriptions container and show how one can extend it to hold onto any number of subscriptions and still remain non-blocking.

2015. május 18., hétfő

Operator concurrency primitives: producers (part 6 - final)

Introduction

You might have already guessed that Producers are the real 'heroes' of the operators. One way or another, once the request-response ratio is no longer 1:1, you need to introduce an intermediate producer that needs to coordinate messages going in either direction.

In this final part about various producers, I'm going to detail the final, generic-purpose Producer implementation which coordinates not just the requesting and changing of the upstream producer but also makes sure event delivery doesn't run concurrently with such changes. Such a producer are required inside switchOnNext() because if there is an active emission from a previous source, one can't switch to the next and trigger execution. One has to wait for the emission to complete and then make the switch, so no concurrency and no over-requesting happens.

The producer-observer-arbiter

The solution, as usual, is to use one of the serialized access primitives and route every relevant method call through the emitter-loop or drain. For a start, here is the basic class structure for the Producer:


public final class ProducerObserverArbiter<T> 
implements Producer, Observer<T> {      // (1)
    final Subscriber child;
    
    boolean emitting;
    
    List<Object> queue;                                    // (2)
    Producer currentProducer;
    long requested;
    
    public ProducerObserverArbiter(
            Subscriber<? super T> child) {
        this.child = child;
    }
    
    @Override
    public void onNext(T t) {
        // implement
    }
    @Override
    public void onError(Throwable e) {
        // implement
    }
    @Override
    public void onCompleted() {
        // implement
    }
    public void request(long n) {
        // implement
    }
    public void set(Producer p) {
        // implement
    }
    void emitLoop() {
        // implement
    }
}

The basic structure is straightforward: we extend Producer and Observer (for convenience on the method), capture the child and will use an emitter-loop where the tasks (such as value emission, request, production) will be queued (2).

Instead of using missed fields directly (more on that later), I'll wrap the various actions into private holder classes so a single common queue can hold not just the events, but the request and producer changes as well:

    private static final class ErrorSentinel {                     // (1)
        final Throwable error;
        public ErrorSentinel(Throwable error) {
            this.error = error;
        }
    }

    private static final Object COMPLETED_SENTINEL = new Object(); // (2)
    
    private static final class RequestSentinel {                   // (3)
        final long n;
        public RequestSentinel(long n) {
            this.n = n;
        }
    }
    
    private static final class ProducerSentinel {                  // (4)
        final Producer p;
        public ProducerSentinel(Producer p) {
            this.p = p;
        }
    }

We define 4 types of sentinels; these private classes and object instance make sure their underlying value doesn't conflict with the emission value of T (i.e., you can observe a stream of Throwables, Longs or Producers without interference). Depending on whether or not the queue supports null values, you can introduce a NULL_SENTINEL as needed.

  1. We store errors in an ErrorSentinel instance.
  2. We use a constant for indicating a stateless completed event happened.
  3. We store the positive requests and negative production values in a RequestSentinel instance.
  4. We store the setting or clearing of a producer in a ProducerSentinel instance.

Let's start with the Observer method implementations:


    // ...
    @Override
    public void onNext(T t) {
        synchronized (this) {
            if (emitting) {
                List<Object> q = queue;
                if (q == null) {
                    q = new ArrayList<>();
                    queue = q;
                }
                q.add(t);
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            child.onNext(t);

            long r = requested;
            if (r != Long.MAX_VALUE) {            // (1)
                requested = r - 1;
            }
            
            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }
    // ...

The implementation of onNext() is straightforward and resembles earlier serialization examples but with the exception that we decrement the current requested amount (unless running in unlimited mode) (1).

    // ...
    @Override
    public void onError(Throwable e) {
        synchronized (this) {
            if (emitting) {
                List<Object> q = new ArrayList<>();
                q.add(new ErrorSentinel(e));        // (1)
                queue = q;
                return;
            }
            emitting = true;
        }
        child.onError(e);                           // (2)
    }
    // ...

In onError(), we don't enqueue the Throwable after earlier actions but clear (1) the queue so the very first action executed by the emitLoop() will be the delivery of this error, skipping any further events. If the emission race is won, there is no need to loop for other events or even 'unlock' the emission flag (2).

The implementation of onCompleted() looks quite similar:
    // ...
    @Override
    public void onCompleted() {
        synchronized (this) {
            if (emitting) {
                List<Object> q = new ArrayList<>();
                q.add(COMPLETED_SENTINEL);
                queue = q;
                return;
            }
            emitting = true;
        }
        child.onCompleted();
    }
    // ...

Next comes the request() method's familiar implementation:
    // ...
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n == 0) {
            return;
        }
        synchronized (this) {
            if (emitting) {
                List<Object> q = queue;
                if (q == null) {
                    q = new ArrayList<>();
                    queue = q;
                }
                q.add(new RequestSentinel(n));          // (1)
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            requested = u;                             // (2)

            Producer p = currentProducer;
            if (p != null) {                           // (3)
                p.request(n);
            }
            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }    
    }
    // ...

We enqueue (1) the requested amount, guaranteed to be positive, in the same way as actions are enqueued in the other methods. The requested amount is incremented (and capped) (2), then we take the current producer and if non-zero, we request the original amount of n (3).

Similarly, the set() producer uses the now-familiar pattern:

    public void set(Producer p) {
        synchronized (this) {
            if (emitting) {
                List<Object> q = queue;
                if (q == null) {
                    q = new ArrayList<>();
                    queue = q;
                }
                q.add(new ProducerSentinel(p));
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            currentProducer = p;
            long r = requested;
            if (p != null && r != 0) {                  // (1)
                p.request(r);
            }
            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }


If the producer is switched to a non-null producer and the current requested amount is non-zero, we request the entire amount from the new producer (1).

Finally, let's see the implementation of the emitLoop():

    // ...
    void emitLoop() {
        for (;;) {
            List<Object> q;
            synchronized (this) {                             // (1)
                q = queue;
                if (q == null) {
                    emitting = false;
                    return;
                }
                queue = null;
            }
            long e = 0;
            
            for (Object o : q) {
                if (o == null) {                              // (2)
                    child.onNext(null);
                    e++;
                } else
                if (o == COMPLETED_SENTINEL) {                // (3)
                    child.onCompleted();
                    return;
                } else
                if (o.getClass() == ErrorSentinel.class) {    // (4)
                    child.onError(((ErrorSentinel)o).error);
                    return;
                } else
                if (o.getClass() == ProducerSentinel.class) { // (5)
                    Producer p = (Producer)o;
                    currentProducer = p;
                    long r = requested;
                    if (p != null && r != 0) {
                        p.request(r);
                    }
                } else
                if (o.getClass() == RequestSentinel.class) {  // (6)
                    long n = ((RequestSentinel)o).n;
                    long u = requested + n;
                    if (u < 0) {
                        u = Long.MAX_VALUE;
                    }
                    requested = u;
                    Producer p = currentProducer;
                    if (p != null) {
                        p.request(n);
                    }
                } else {                                      // (7)
                    child.onNext((T)o);
                    e++;
                }
            }
            long r = requested;
            if (r != Long.MAX_VALUE) {                        // (8)
                long v = requested - e;
                if (v < 0) {
                    throw new IllegalStateException();
                }
                requested = v;
            }
        }
    }
}

This looks, by far, the most action-packed emitter-loop:

  1. We 'dequeue' the batch of actions.
  2. For each action in the queue, if said item is null, we emit null to the child Subscriber and count the number of emissions.
  3. If the item is equal to the COMPLETED_SENTINEL, no further action should be performed and the loop quits.
  4. Otherwise, we have a non-null value so type-checks can be performed to determine the exact thing to do. If we encounter an ErrorSentinel, the error is unwrapped, emitted, and the loop quits.
  5. If a ProducerSentinel is encountered, the inner producer is unwrapped and stored in currentProducer, and if it is non-null, the entire current request amount is requested through it.
  6. If we encounter a RequestSentinel, the amount is unwrapped, added to the current requested amount (capped) and the unwrapped amount is requested from the current producer if available.
  7. Otherwise, since we shouldn't have any more sentinel types, what remains is a plain non-null element, which we emit to the child subscriber and increment the emission count.
  8. Finally, if the requested amount is not unbounded, we decrement the requested amount by the accumulated emission count and update the instance field.


Perhaps it is not meaningful or even desirable to let a producer produce if it is going to be replaced immediately. For example, in switchOnNext, once two new sources arrive in quick succession, one doesn't really want to start the first one but skip it and start with the second one. You can use the missedProducer approach from the previous part instead of enqueueing a switch-action and you may also consider whether such switch should clear the queue of pending values or not. In addition, one can use the instance fields from ProducerArbiter to save on further allocations by the request and produced actions.

The only thing remaining is the usage example, for example, in a operator that switches among a fixed set of Observables based on a timer:


public static final class SwitchTimer<T> 
implements OnSubscribe<T> {
    final List<Observable<? extends T>> sources;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    public SwitchTimer(
            Iterable<? extends Observable<? extends T>> sources, 
            long time, TimeUnit unit, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.sources = new ArrayList<>();
        this.time = time;
        this.unit = unit;
        sources.forEach(this.sources::add);
    }
    @Override
    public void call(Subscriber<? super T> child) {
        ProducerObserverArbiter<T> poa = 
            new ProducerObserverArbiter<>(child);             // (1)
        
        Scheduler.Worker w = scheduler.createWorker();        // (2)
        child.add(w);
        
        child.setProducer(poa);                                  
        
        SerialSubscription ssub = new SerialSubscription();   // (3)
        child.add(ssub);
        
        int[] index = new int[1];
        
        w.schedulePeriodically(() -> {
            int idx = index[0]++;
            if (idx >= sources.size()) {                      // (4)
                poa.onCompleted();
                return;
            }
            Subscriber<T> s = new Subscriber<T>() {           // (5)
                @Override
                public void onNext(T t) {
                    poa.onNext(t);
                }
                @Override
                public void onError(Throwable e) {
                    poa.onError(e);
                }
                @Override
                public void onCompleted() {
                    if (idx + 1 == sources.size()) {          // (6)
                        poa.onCompleted();
                    }
                }
                @Override
                public void setProducer(Producer producer) {
                    poa.set(producer);
                }
            };

            ssub.set(s);                                      // (7)
            sources.get(idx).unsafeSubscribe(s);
            
        }, time, time, unit);
    }
}

List<Observable<Long>> timers = Arrays.asList(
    Observable.timer(100, 100, TimeUnit.MILLISECONDS),
    Observable.timer(100, 100, TimeUnit.MILLISECONDS)
        .map(v -> v + 20),
    Observable.timer(100, 100, TimeUnit.MILLISECONDS)
        .map(v -> v + 40)
);

Observable<Long> source = Observable.create(
    new SwitchTimer<>(timers, 500, 
    TimeUnit.MILLISECONDS, Schedulers.computation()));
        
source.toBlocking().forEach(System.out::println);

It is constructed as follows:

  1. We create our ProducerObserverArbiter instance.
  2. We create a scheduler-worker instance and add it to the child subscriber to allow cancelling the whole schedule upon child unsubscription.
  3. We will hold onto the Subscriber for the active Observable sequence and chain it with the child subscriber in order to propagate cancellation.
  4. In case the final Observable sequence didn't complete in time, the periodic task will complete the whole sequence.
  5. Otherwise we create a Subscriber for the next Observable (without sharing anything with child).
  6. In the onCompleted() we check if the captured index is the last and if so, we complete by sending an onCompleted() through the arbiter.
  7. The SerialSubscription.set will make sure the previous subscription will be unsubscribed when a new subscription happens to the next source.

Conclusion

In the producers series, I've shown implementation examples ranging from a single-value-producer up to a complete producer-observer-arbiter. With each producer variant, more and more complexity and explanation was given which should help operator writers in developing their custom solutions.

The next primitive I'm going to talk about is the various Subscription containers and show how to implement custom ones in case the standard set isn't adequate for some reason.



2015. május 15., péntek

Operator concurrency primitives: producers (part 5)

Introduction 

Dealing with multiple sources and backpressure at the same time is difficult when one writes an operator. Even a simple task such as continuing with another observable sequence once the first has completed puts a challenge on how request accounting and propagation should be performed for correct behavior.

Generally in RxJava and especially with reactive-streams, one must set the Producer on a Subscriber at most/exactly once. Don't let the thread-safety in Subscriber.setProducer() fool you into thinking that you can change the producer because the Subscriber doesn't do (nor should it IMO) proper request accounting but just holds onto your initial request amount(s) until a Producer is set on it. If you have outstanding requests and you change set the new Producer, that producer's request will be called with the value of the latest 'private' request() call instead of with a request amount equal to the undelivered amount remaining from the previous producer.

In this blog post, I'll introduce a kind of producer which can help overcome this particular challenging situation.

The producer-arbiter

Let's assume we need to write an operator with following behavior (and we don't like concatWith()): Given two source Observables, we need an operator that observes the first one and once it completes normally, it starts observing the other Observable till completion and itself complete normally.


public final class ThenObserve<T> implements Operator<T, T> {
    final Observable<? extends T> other;
    public ThenObserve(Observable<? extends T> other) {
        this.other = other;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        Subscriber<T> parent = new Subscriber<T>(child, false) {
            @Override
            public void onNext(T t) {
                child.onNext(t);
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                other.unsafeSubscribe(child);
            }
        };
        child.add(parent);
        return parent;
    }
}

Observable<Integer> source = Observable
    .range(1, 10)
    .lift(new ThenObserve<>(Observable.range(11, 90)));

source.subscribe(System.out::println);

System.out.println("---");

TestSubscriber<Integer> ts = new TestSubscriber<>();
ts.requestMore(20);

source.subscribe(ts);

ts.getOnNextEvents().forEach(System.out::println);

If you run the example, the first observation of the source prints, as expected, values from 1 to 100. However, the second case, where we request 20 elements only, it prints values from 1 to 30! Clearly, the other range observable didn't just produce the remaining 10 elements but thought it can produce up to 20 elements.

Therefore, we need to track the request amounts from downstream and the production amount from upstream and once there has to be a source change, make the new source continue with the remaining value. To accomplish this, we need a Producer to manage the request and source changes in a thread-safe manner.

Let's call it ProducerArbiter and have the following base class structure:


public final class ProducerArbiter 
implements Producer {
    long requested;                                   // (1)   
    Producer currentProducer;

    boolean emitting;                                 // (2)
    long missedRequested;
    long missedProduced;
    Producer missedProducer;
    
    static final Producer NULL_PRODUCER = n -> { };   // (3)
    
    @Override
    public void request(long n) {
        // to implement
    }
    
    public void produced(long n) {
        // to implement
    }
    
    public void set(Producer newProducer) {
        // to implement
    }

    public void emitLoop() {
        // to implement
    }
}


The ProducerArbiter looks quite normal, so far, with the following properties:

  1. We keep track of the current outstanding request amount and the current producer. This producer can be null during which we'll still keep aggregating the requested amounts and request them together once a producer arrives.
  2. We will use the emitter-loop serialization construct because we should allow concurrent request() calls and changing the producer concurrently. Note that this doesn't serialize in respect of the onNext events travelling from upstream to downstream and a concurrent producer change may trigger a concurrent emission of onNext events from two sources. I'll handle this case in the next part of the series. Luckily, this won't be a problem with our ThenObserve operator's use case.
  3. We'd like to clear the current producer in some situations to avoid retention, but we'll use null in missedProducer to indicate there was no attempt to set a producer at that time.
With the basic structure set up, let's start implementing the methods one by one:


    // ... same as before
    @Override
    public void request(long n) {
        if (n < 0) {                                   // (1)
            throw new IllegalArgumentException();
        }
        if (n == 0) {
            return;
        }
        synchronized (this) {
            if (emitting) {
                missedRequested += n;                  // (2)
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            requested = u;                             // (3)
            
            Producer p = currentProducer;
            if (p != null) {
                p.request(n);                          // (4)
            }
            
            emitLoop();                                // (5)
            skipFinal = true;
        } finally {
            if (!skipFinal) {                          // (6)
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

This won't be the longest method I'm going to explain:

  1. We perform the usual request amount checks.
  2. We track the missed requests separately from the missed production because if combined, we couldn't distinguish between an allowed request 'overflow' (capped at Long.MAX_VALUE) and overproduction.
  3. We are performing the capped update to the requested amount, as usual.
  4. If there is a producer attached, request the amount n (and not the requested amount!) because requests are cumulative on every producers.
  5. There could have been concurrent method calls so we enter the loop-phase of the emitter-loop approach.
  6. If emitLoop() returns normally, we skip the finally, otherwise, we 'unlock' the producer to allow using the producer-arbiter again with perhaps a different producer (see Observable.retry()).

Production accounting looks as follows:

    // ... continued
    public void produced(long n) {
        if (n <= 0) {                                    // (1)
            throw new IllegalArgumentException();
        }
        synchronized (this) {
            if (emitting) {
                missedProduced += n;                     // (2)
                return;
            }
            emitting = true;
        }
        
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r - n;
            if (u < 0) {
                throw new IllegalStateException();       // (3)
            }
            requested = u;
        
            emitLoop();                                  // (4)
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

Quite a similar structure:

  1. We treat production of 0 or negative as bugs.
  2. We track the missed production amount separately (see the explanation in request() above).
  3. We subtract the produced amount from the current requested amount and check for an underflow. Such underflow is either a bug or the lack of backpressure-support from the upstream.
  4. We now try to process any missed actions.


    // ... continued
    public void set(Producer newProducer) {
        synchronized (this) {
            if (emitting) {
                missedProducer = newProducer == null ? 
                    NULL_PRODUCER : newProducer;          // (1)
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            currentProducer = newProducer;
            if (newProducer != null) {
                newProducer.request(requested);           // (2)
            }
            
            emitLoop();                                   // (3)
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

The structure and behavior should be quite familiar now:

  1. We indicate the current producer should be updated to a new producer or cleared. We overwrite any previous missed producer because it means they didn't really get the chance of emitting anything anyway.
  2. If we got a new, replacement producer, we request the entire requested amount. This is what accomplishes the the requirement that new producers should receive the remaining requested amounts not produced by the previous ProducerNote that an asynchronous producer change run concurrently with an onNext event has the potential to over-request: the new producer receives n but the previous producer may just produce 1 before 'stopped' and now we have an unwanted plus 1 coming down and the potential for MissingBackpressureException. As I mentioned before, the situation can't happen in the current scenario and I'll have a separate and full post about how to resolve it.
  3. The rest is the usual: enter the loop to handle any missed actions and quit by 'unlocking' in normal or exception cases.
The final method remaining is the popular emitLoop() itself:

    // ... continued
    public void emitLoop() {
        for (;;) {
            long localRequested;
            long localProduced;
            Producer localProducer;
            synchronized (this) {
                localRequested = missedRequested;
                localProduced = missedProduced;
                localProducer = missedProducer;
                if (localRequested == 0L 
                        && localProduced == 0L
                        && localProducer == null) {       // (1)
                    emitting = false;
                    return;
                }
                missedRequested = 0L;
                missedProduced = 0L;
                missedProducer = null;                    // (2)
            }
            
            long r = requested;
            
            if (r != Long.MAX_VALUE) {                    // (3)
                long u = r + localRequested;
                if (u < 0 || u == Long.MAX_VALUE) {       // (4)
                    r = Long.MAX_VALUE;
                    requested = r;
                } else {
                    long v = u - localProduced;           // (5)
                    if (v < 0) {
                        throw new IllegalStateException();
                    }
                    r = v;
                    requested = v;
                }
            }
            if (localProducer != null) {                  // (6)
                if (localProducer == NULL_PRODUCER) {
                    currentProducer = null;
                } else {
                    currentProducer = localProducer;
                    localProducer.request(r);             // (7)
                }
            } else {
                Producer p = currentProducer;
                if (p != null && localRequested != 0L) {
                    p.request(localRequested);            // (8)
                }
            }
        }
    }
}

Lots of state and possibilities to cover with the emitLoop():

  1. The condition to leave the emitter-loop is to not have any missed requests, productions and producer changes at all.
  2. Once we discover we missed something, we clear each indicator fields.
  3. If the current requested amount is Long.MAX_VALUE, there is no point in doing too much request accounting because the downstream is in 'infinite' or 'unlimited' mode
  4. But even if not, the missed request amount could just put us into this 'infinite' mode where no further request accounting is necessary.
  5. Otherwise, we need to subtract the missed production amount, update the requested amount locally and in the instance field.
  6. If there was a producer change, we update the currentProducer (or clear it).
  7. If there new producer isn't the null-producer, we update the currentProducer and request the entire amount of the currently known outstanding requests.
  8. Otherwise, if there weren't any new Producers and there were some missed requests, we only request that many from the current producer (if there is one).
With the fully functional ProducerArbiter in hand, we can modify the original ThenObserve operator to do the right thing:

public final class ThenObserve<T> implements Operator<T, T> {
    final Observable<? extends T> other;
    public ThenObserve(Observable<? extends T> other) {
        this.other = other;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super T> child) {
        ProducerArbiter pa = new ProducerArbiter();         // (1)
        
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                child.onNext(t);
                pa.produced(1);                             // (2)
            }
            @Override
            public void onError(Throwable e) {
                pa.set(null);                               // (3)
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                pa.set(null);                               // (4)
                
                Subscriber<T> parent2 = create2(pa, child); // (5)
                child.add(parent2);
                
                other.unsafeSubscribe(parent2);             // (6)
            }
            @Override
            public void setProducer(Producer producer) {
                pa.set(producer);                           // (7)
            }
        };
        child.add(parent);
        child.setProducer(pa);
        return parent;
    }
    
    Subscriber<T> create2(ProducerArbiter pa, 
            Subscriber<? super T> child) {                  // (8)
        return new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                child.onNext(t);
                pa.produced(1);
            }
            @Override
            public void onError(Throwable e) {
                pa.set(null);
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                pa.set(null);
                child.onCompleted();
            }
            @Override
            public void setProducer(Producer producer) {
                pa.set(producer);
            }
        };
    }
}

This extension doesn't look too elegant, what's going on?

  1. We create an instance of our new ProducerArbiter.
  2. We acknowledge the production of a single onNext value.
  3. We release the current producer on receipt of an error to avoid unnecessary object retention.
  4. Again, since the upstream (the first Observable) completed, we get rid of its producer.
  5. Subscribers shouldn't and often can't be reused so we need to create another subscriber for the second source. Besides, using this would end up in infinite resubscription due to the recursion (and stopped by a StackOverflowError).
  6. With the new subscriber, added to the child for unsubscription propagation, we subscribe to the other Observable and complete the first subscriber.
  7. We capture the potential producer from upstream and use it with the arbiter.
  8. For convenience, I've moved the creation of the second Subscriber into a helper method. Its structure looks quite the same to the first Subscriber, with the exception that in the onCompleted method, we complete the child instead of doing any other rounds with any other Observable.

Conclusion

With a producer structured like our new ProducerArbiter, we can change the producer over the head of a child subscriber and retain the correct amount of requested and thus produced values across producers of various upstream sources.

However, as I called for caution in the detailed explanation, this producer-arbiter runs independently of the onXXX event delivery and if one is not in 'control' of the event emission, strange and unwanted behavior may arise. With our example, this wasn't the case because when we switched producers in the first Subscriber.onCompleted() method, we could be sure no more onNext or onError events will be fired from the current upstream ever and thus make the swap safely.

In fact, most very advanced operators, such as switchOnNext() and others, experience such concurrent requesting-delivering so that to make them behave properly, we need a more advanced arbiter: a producer-observer-arbiter; the topic of the next post in the series.