Introduction
In this series, I put a spotlight onto some common and some less common yet sneaky pitfalls of operator implementations. Now that we know more about producers, subscription-containers and schedulers, let's see some more pitfalls.#9: Subscribing twice
Some operators, especially those built upon OnSubscribe may take their Subscribers and subscribe them to another Observable. An example for such an operator is defer().Let's assume you want to create an operator which calls an action callback before it subscribes the incoming Subscriber to the real Observable:
public final class OnSubscribeRunAction<T> implements OnSubscribe<T> { final Observable actual; final Action0 action; public OnSubscribeRunAction(Observable actual, Action0 action) { this.action = action; this.actual = actual; } @Override public void call(Subscriber child) { try { action.call(); } catch (Throwable e) { child.onError(e); return; } actual.unsafeSubscribe(child); } } Observable<Integer> source = Observable.create( new OnSubscribeRunAction<>(Observable.range(1, 3), () -> { System.out.println("Subscribing!"); })); TestSubscriber<Integer> ts = new TestSubscriber<Integer>() { @Override public void onStart() { Thread t = new Thread(() -> { System.out.println("Starting helper thread " + Thread.currentThread()); }); t.start(); } }; source.unsafeSubscribe(ts);
If we run the example code, we see that onStart is called twice! The problem is that how the backpressure-related logic is designed in RxJava: whenever a child is subscribed, its onStart() method is called which allows the client to get some code executed before the first onNext value. Usually, this is where the initial request amount is issued or perhaps a GUI window associated with the subscriber is open.
Now in regular end-subscribers, this rarely comes up because the subscribe() method wraps them into a SafeSubscriber which doesn't forward its onStart method. However, when dealing with one another's operators, unsafeSubscribe is very common and the onStart ends up being called multiple times.
The resolution is to wrap the child subscriber with another subscriber in the operator that doesn't forward the onStart method:
// ... try { action.call(); } catch (Throwable e) { child.onError(e); return; } actual.unsafeSubscribe(new Subscriber<T>(child) { @Override public void onNext(T t) { child.onNext(t); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { child.onCompleted(); } }); // ...
#10: Leaking scheduler workers
Let's say instead of performing some arbitrary action on subscription immediately, one would like to delay its execution by some time. If the sequence doesn't complete in time, one can perform some mitigating actions (i.e., showing a work in progress dialog for example).public final class OnSubscribeRunActionDelayed<T> implements OnSubscribe<T> { final Observable actual; final Action0 action; final long delay; final TimeUnit unit; final Scheduler scheduler; public OnSubscribeRunActionDelayed(Observable actual, Action0 action, long delay, TimeUnit unit, Scheduler scheduler) { this.action = action; this.actual = actual; this.delay = delay; this.unit = unit; this.scheduler = scheduler; } @Override public void call(Subscriber<? super T> child) { SerializedSubscriber<T> s = new SerializedSubscriber<>(child); Worker w = scheduler.createWorker(); // (1) Subscription cancel = w.schedule(() -> { try { action.call(); } catch (Throwable e) { s.onError(e); } }, delay, unit); actual .doOnCompleted(cancel::unsubscribe) .unsafeSubscribe(s); } } Observable<Integer> source = Observable.create( new OnSubscribeRunActionDelayed<>(Observable .just(1).delay(1, TimeUnit.SECONDS), () -> { System.out.println("Sorry, it takes too long..."); }, 500, TimeUnit.MILLISECONDS, Schedulers.io())); Subscription s = source.subscribe(System.out::println); Thread.sleep(250); s.unsubscribe(); Thread.sleep(1000); source.subscribe(System.out::println); Thread.sleep(1500); for (Thread t : Thread.getAllStackTraces().keySet()) { if (t.getName().startsWith("RxCached")) { System.out.println(t); } } }Again, running the example gives unwanted results: the excuse message is printed even if the first subscription was cancelled and when we dump the threads at the end, we'll see two RxCachedThreadSchedulers but clearly only one should be there due to reusability.
The problem is that the worker and the schedule token is not participating in unsubscription properly: even if the actual Observable is fast, only the work is unsubscribed but the worker is not, and thus it is never returned to the cache-pool.
The bug is sneaky because Schedulers.computation() and Schedulers.trampoline() are not sensitive to scheduler leaks: the former arbitrates between a fixed set of actual workers and the latter doesn't retain any threading resources and can be cleanly garbage collected. Schedulers.io(), Schedulers.from() and newThread(), on the other hand, hold onto a thread which can't be reused / shutdown unless the worker is unsubscribed.
The solution is to add the worker and the cancel token to the child subscriber as resource so they are unsubscribed if the child unsubscribes, however, since there is going to be a single scheduled task, unsubscribing the worker will unsubscribe 'all' of the pending and running tasks thus there is no need to add the individual cancel token to the child subscriber; the adding the worker is enough:
// ... SerializedSubscriber<T> s = new SerializedSubscriber<>(child); Worker w = scheduler.createWorker(); child.add(w); w.schedule(() -> { try { action.call(); } catch (Throwable e) { s.onError(e); } }, delay, unit); actual .doOnCompleted(w::unsubscribe) .unsafeSubscribe(s); // ...
#11: Adding the worker to the subscriber
Let's assume we need an operator that when receives a value, it emits an observable that will emit that single value after some delay.public final class ValueDelayer<T> implements Operator<Observable<T>, T> { final Scheduler scheduler; final long delay; final TimeUnit unit; public ValueDelayer(long delay, TimeUnit unit, Scheduler scheduler) { this.delay = delay; this.unit = unit; this.scheduler = scheduler; } @Override public Subscriber<? super T> call( Subscriber<? super Observable<T>> child) { Worker w = scheduler.createWorker(); child.add(w); Subscriber<T> parent = new Subscriber<T>(child, false) { @Override public void onNext(T t) { BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create(); w.schedule(() -> { bus.onNext(t); bus.onCompleted(); }, delay, unit); child.onNext(bus); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { child.onCompleted(); } }; child.add(parent); return parent; } } Observable.range(1, 3) .lift(new ValueDelayer<>(1, TimeUnit.SECONDS, Schedulers.computation())) .take(1) .doOnNext(v -> v.subscribe(System.out::println)) .subscribe(); Thread.sleep(1500);
Strangely, the example prints nothing but we expect it to print 1 after a second. The problem is with the take(1) unsubscribing the upstream after receiving the first 'window' which then cancels the scheduled emission of the value itself.
Resolving the problem can take many shapes and actually depends on the broader context. Clearly, we need to unsubscribe the worker yet allow the consumption of the inner observable sequence.
One way is to keep an atomic counter to count the number of unobserved inner Observables and unsubscribe the worker if it reaches zero. In addition, this solution requires the inner Observables to be always consumed.
// ... Worker w = scheduler.createWorker(); final AtomicBoolean once = new AtomicBoolean(); final AtomicInteger wip = new AtomicInteger(1); // (1) Subscriber<T> parent = new Subscriber<T>(child, false) { @Override public void onNext(T t) { if (wip.getAndIncrement() == 0) { // (2) wip.set(0); return; } BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create(); w.schedule(() -> { try { bus.onNext(t); bus.onCompleted(); } finally { release(); // (3) } }, delay, unit); child.onNext(bus); if (child.isUnsubscribed()) { if (once.compareAndSet(false, true)) { // (4) release(); } } } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { if (once.compareAndSet(false, true)) { release(); // (5) } child.onCompleted(); } void release() { if (wip.decrementAndGet() == 0) { w.unsubscribe(); } } }; parent.add(Subscriptions.create(() -> { // (6) if (once.compareAndSet(false, true)) { if (wip.decrementAndGet() == 0) { w.unsubscribe(); } } })); child.add(parent); return parent; } // ...
The solution involves several notable changes:
- We need an atomic integer and an atomic boolean. The former one counts the unconsumed inner Observables as well as the active main upstream source of Ts. Since the upstream can end at many locations and perhaps 'multiple times' (i.e., terminating just after the onNext but the upstream still sends an onCompleted which then triggers an unsubscription coming from downstream). Since the upstream should count as 1, we need to use the once to make sure the upstream's single decrement happens only once.
- We increment the number of 'open windows' by one before even attempting to schedule its task. However, since (6) can asynchronously decrement the wip value to zero, an increment from 0 to 1 indicates an onNext slipped through in which case the worker is already unsubscribed and the child would receive an Observable that can't ever emit.
- Once the individual inner Observables could emit their values, we release one 'window'.
- We eagerly check if the child has just unsubscribed after the emission of the inner 'window'. If so, we try to release the upstream once.
- If the downstream didn't unsubscribe, the onCompleted has to try and release the upstream once too.
- But since unsubscription can happen at any time, even between event emissions, we still have to try and release the upstream once.
Conclusion
One can argue that these pitfalls are corner cases but as an operator developer, especially if planning to submit a PR, has to look out for such problems.
In this post, we've looked at how one can accidentally start its subscribers multiple times and shown two opposite cases where adding or not-adding a worker to the child subscriber can cause problems.