2017. november 29., szerda

When multiple subscribeOn()s do have effect

Introduction


In many tutorials and explanations, it has been said that having multiple subscribeOn()s has no effect and only the one closest to the source wins. I often tell this with the wording "no practical effect". However, it is possible to demonstrate the effects of multiple subscibeOn()s that have some actual effects.

What is subscribeOn again?


The most precise definition of this operator I can formulate is as follows:


  • subscribeOn changes where (on what thread) the (side) effects of calling subscribe() on the parent/upstream Observable (Flowable, Single, etc.) happen.


So what are these subscription (side) effects look like in code?


Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
        emitter.onNext(i + ": " + Thread.currentThread().getName());
    }
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.blockingSubscribe(System.out::println);

// Prints:
// -------
// 0: RxCachedThreadScheduler-1
// 1: RxCachedThreadScheduler-1
// 2: RxCachedThreadScheduler-1
// 3: RxCachedThreadScheduler-1
// 4: RxCachedThreadScheduler-1
// 5: RxCachedThreadScheduler-1
// 6: RxCachedThreadScheduler-1
// 7: RxCachedThreadScheduler-1
// 8: RxCachedThreadScheduler-1
// 9: RxCachedThreadScheduler-1


In this example, the effect of subscribing is that the body of the ObservableOnSubscribe starts running on the thread provided via the io() Scheduler.

Applying yet another subscribeOn after the first one won't change what is printed to the console.

Most source-like operators, such as create(), fromCallable(), fromIterable(), do have subscription side-effects as they often start emitting event(s) immediately.

Most instance operators, such as map(), filter(), take(), don't have subscription side-effects on their own and just subscribe() to their upstream.

Instance operators with subscription side-effects


However, there are a couple of instance operators that do have subscription side-effects. Specifically, any operator that offers a way to specify a per subscriber initial state via some callback.


Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
        emitter.onNext(i + ": " + Thread.currentThread().getName());
    }
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.collect(() -> {
       List<String> list = new ArrayList<>();
       list.add(Thread.currentThread().getName());
       return list;
    }, 
    (a, b) -> a.add(b)
)
.blockingSubscribe(list -> list.forEach(System.out::println));


This will print the following:


main
0: RxCachedThreadScheduler-1
1: RxCachedThreadScheduler-1
2: RxCachedThreadScheduler-1
3: RxCachedThreadScheduler-1
4: RxCachedThreadScheduler-1
5: RxCachedThreadScheduler-1
6: RxCachedThreadScheduler-1
7: RxCachedThreadScheduler-1
8: RxCachedThreadScheduler-1
9: RxCachedThreadScheduler-1


The first printout, main, is due to the fact that the blockingSubscribe() is subscribing on the main thread and the collect() operator performs its initialization side-effect on the main thread as well.

Now let's see what happens if we add subscribeOn() after collect:


Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
        emitter.onNext(i + ": " + Thread.currentThread().getName());
    }
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.collect(() -> {
       List<String> list = new ArrayList<>();
       list.add(Thread.currentThread().getName());
       return list;
    }, 
    (a, b) -> a.add(b)
)
.subscribeOn(Schedulers.computation())  // <---------------------------------------
.blockingSubscribe(list -> list.forEach(System.out::println));

will print the following:

RxComputationThreadPool-1
0: RxCachedThreadScheduler-1
1: RxCachedThreadScheduler-1
2: RxCachedThreadScheduler-1
3: RxCachedThreadScheduler-1
4: RxCachedThreadScheduler-1
5: RxCachedThreadScheduler-1
6: RxCachedThreadScheduler-1
7: RxCachedThreadScheduler-1
8: RxCachedThreadScheduler-1
9: RxCachedThreadScheduler-1


Because subscribeOn(Schedulers.computation()) performs its subscribe() call on one of the computation thread, the subscription side-effect of creating the list (for collecting Strings from upstream) is happening on that thread. Then, subscribeOn(Schedulers.io()) again switches threads to an io thread where the subscription effect of emitting more Strings begins.


Conclusion


Having multiple subscribeOn()s in the same chain has often no practical effect because it just changes the thread where its upstream is subscribed to and most instance operators don't perform any side-effects in their subscribeActual() implementation. Source operators often have side-effects but since they reside on top of a chain, plus since subscribe() travels upstream, only the closest subscribeOn() to this source will have an effect where the code in the source's subscribeActual() will be executed.

However, when an intermediate operator has subscription-side effects, such as executing a user provided callback to perform some per-subscriber initialization, the thread changing of a downstream subscribeOn() will have an effect on this initialization. When is this property relevant? For example, when establishing that initial state is computation heavy or involves blocking calls.

This may seem an odd property, but it can be derived from first principles such as how, where and when control- and item-flow happen in the various operators involved.