2016. március 31., csütörtök

SubscribeOn and ObserveOn

Introduction


One of the most confused operator pair of the reactive ecosystem is the subscribeOn and observeOn operators. The source of confusion may be rooted in a few causes:


  • they sound alike,
  • they sometimes show similar behavior when looked at from downstream and
  • they are duals in some sense.

It appears the name-confusion isn't local to RxJava. Project Reactor faces a similar issue with their publishOn and dispatchOn operators. Apparently, it doesn't matter what they are called and people will confuse them anyhow.

When I started learning about Rx.NET back in 2010, I never experienced this confusion; subscribeOn affects subscribe() and observeOn affects onXXX()

(Remark: I've searched Channel 9 for the early videos but couldn't really find the talk where they build up these operators just like I'm about to do. The closest thing was this.)

My "thesis" is that the confusion may be resolved by walking through how one can implement these operators and thus showing the internal method-call flow.


SubscribeOn


The purpose of subscribeOn() is to make sure side-effects from calling subscribe() happens on some other thread. However, almost no standard RxJava source does side-effects on its own; you can have side-effects with custom Observables, wrapped subscription-actions via create() or as of lately, the with the SyncOnSubscribe and fromCallable() APIs.

Why would one move the side-effects? The main use cases are doing network calls or database access on the current thread or anything that involves blocking wait. Holding off a Tomcat worker thread hasn't been much of a programming problem (that doesn't mean we can't improve the stack with reactive) but holding off the Event Dispatch Thread in a Swing application or the Main thread in an Android application has adverse effect on the user experience.

(Sidenote: it's a funny thing that blocking the EDT is basically a convenience backpressure strategy in the GUI world to prevent the user from changing the application state while some activity was happening.)

Therefore, if the source does something immediately when a child subscribes, we'd want it to happen somewhere off the precious current thread. Naturally, we could just submit the whole sequence and the call to subscribe() to an ExecutorService, but then we'd be faced with the problem of cancellation being separate from the Subscriber. As more and more (complex) sequences requiring this asynchronous subscription behavior, the inconvenient it becomes to manage all those in this manner.

Luckily, we can include this behavior into an operator we call subscribeOn().

For simplicity, let's build this operator on a much simpler reactive base type: the original IObservable from Rx.NET:

@FunctionalInterface
interface IObservable<T> {
    IDisposable subscribe(IObserver<T> observer);
}

@FunctionalInterface
interface IDisposable {
    void dispose();
}

interface IObserver<T> {
    void onNext(T t);
    void onError(Throwable e);
    void onCompleted();
}

Don't worry about synchronous cancellation and backpressure for now.

Let's assume we have a source which just sleeps for 10 seconds:


IObservable<Object> sleeper = o -> {
    try {
        Thread.sleep(1000);
        o.onCompleted();
    } catch (InterruptedException ex) {
        o.onError(ex);
    }
};


which will obviously go to sleep if we call sleeper.subscribe(new IObserver ... ); Let's now create an operator that moves this sleep to some other thread:


ExecutorService exec = Executors.newSingleThreadedExecutor();

IObservable<Object> subscribeOn = o -> {
    Future<?> f = exec.submit(() -> sleeper.subscribe(o));
    return () -> f.cancel(true);
}


The subscribeOn instance will submit the action that subscribes to the actual IObservable to the executor and returns a disposable that will cancel the resulting Future from the submission.

Of course, one would instead have this in some static method or on a wrapper around the IObservable (as Java doesn't support extension methods):




public static <T> IObservable<T> subscribeOn(IObservable<T> source, 
    ExecutorService executor);

public Observable<T> subscribeOn(Scheduler scheduler);

Two of the common questions regarding subscribeOn are what happens when one applies it twice (directly or some regular operators in between) and why can't one change the original thread with a second subscribeOn. I hope the answer becomes apparent from the simplified structure above. Let's apply the operator a second time:

ExecutorService exec2 = Executors.newSingleThreadedExecutor();

IObservable<Object> subscribeOn2 = o -> {
    Future<?> f2 = exec2.submit(() -> subscribeOn.subscribe(o));
    return () -> f2.cancel(true);
};

Now let's expand subscribeOn.subscribe() in place:


IObservable<Object> subscribeOn2 = o -> {
    Future<?> f2 = exec2.submit(() -> {
       Future<?> f = exec.submit(() -> {
          sleeper.subscribe(o);
       });
    });
};

We can simply read this from top to bottom. When o arrives, a task is scheduled on exec2 which when executes, another task is scheduled on exec which when executes subscribes to sleeper with the original o. Becasue the subscribeOn2 was the last, it gets executed first and no matter where it runs the task, it gets rescheduled by subscribeOn anyway on its thread. Therefore, that subscribeOn() operator's thread will matter which is closest to the source and one can't use another subscribeOn() application to change this. This is why APIs built on top of Rx either should not pre-apply subscribeOn() when they return an Observable or give the option to specify a scheduler.

Unfortunately, the subscribeOn operator above doesn't handle unsubscription properly: the result of the sleeper.subscribe() is not wired up to that external IDisposable instance and thus won't dispose the "real" subscription. Of course, this can be resolved by having a composite IDisposable and adding all relevant resources to it. In RxJava 1, however, we don't need this kind of juggling and the operator can be written with less work:


Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);
    worker.schedule(
        () -> source.unsafeSubscribe(Schedulers.wrap(subscriber))
    )
});

This makes sure the unsubscribe() call on the subscriber will affect the schedule() and whatever resources the upstream source would use. We can use unsafeSubscribe() to avoid the unnecessary wrapping into a SafeSubscriber but we have to wrap the subscriber anyway because both subscribe() and unsafeSubscribe() call onStart() on the incoming Subscriber, which has already been called by the outer Observable. This avoids repeating any effects inside the user's Subscriber.onStart() method.

The structure above composes backpressure as well, but we are not done.

Before RxJava got backpressure, the subscribeOn() implementation above made sure that an otherwise synchronous source would emit all of its events on the same thread:


Observable.create(s -> {
    for (int i = 0; i < 1000; i++) {
        if (s.isUnsubscribed()) return;
       
        s.onNext(i);
    }

    if (s.isUnsubscribed()) return;

    s.onCompleted();
});

Users started to implicitly rely on this property. Backpressure breaks this property because usually the thread that calls request() will end up running the fragment of the loop above (see range()), causing potential thread-hopping. Therefore, to keep the same property, calls to request() has to go to the very same Worker that did the original subscription.

The actual operator thus is more involved:


subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);
    
    worker.schedule(() -> {
       Subscriber<T> s = new Subscriber<T>(subscriber) {
           @Override
           public void onNext(T v) {
               subscriber.onNext(v);
           }

           @Override
           public void onError(Throwable e) {
               subscriber.onError(e);
           }

           @Override
           public void onCompleted() {
               subscriber.onCompleted();
           }

           @Override
           public void setProducer(Producer p) {
               subscriber.setProducer(n -> {
                   worker.schedule(() -> p.request(n));
               });
           }
       };

       source.unsafeSubscribe(s);
    });
}

Other than forwarding the onXXX() methods to the child subscriber, we set a custom producer on the child where the request() method schedules an action that calls the original producer with the same amount on the scheduler, ensuring that if there is an emission tied to the request, that happens on the same thread every time.

This can be optimized a bit by capturing the current thread in the outer schedule() action, comparing it to the caller thread in the custom Producer and then calling p.request(n) directly instead of scheduling it:


    Thread current = Thread.currentThread();

    // ...

    subscriber.setProducer(n -> {
        if (Thread.currentThread() == current) {
            p.request(n);
        } else {
            worker.schedule(() -> p.request(n));
        }
    });



ObserveOn


The purpose of observeOn is to make sure values coming from any thread are received or observed on the proper thread. RxJava is by default synchronous, which technically means that onXXX() methods are called in sequence on the same thread:


for (int i = 0; i < 1000; i++) {
    MapSubscriber.onNext(i) {
       FilterSubscriber.onNext(i) {
           TakeSubscriber.onNext(i) {
               MySubscriber.onNext(i);
           }
       }
    }
}

There are several use cases for moving this onNext() call (and any subsequent calls chained after) to another thread. For example, generating the input to a map() operation is cheap but the calculation itself is expensive and would hold off the GUI thread. Another example is when there is a background activity (database, network or the previous heavy computation), the results should be presented on the GUI and that requires the programmer to only interact with the GUI framework on the specific thread.

In concept, observeOn works by scheduling a task for each onXXX() calls from the source on a specific scheduler where the original parameter value is handed to the downstream's onXXX() methods:


ExecutorService exec = Executors.newSingleThreadedExecutor();

IObservable<T> observeOn = o -> {
    source.subscribe(new Observer<T>() {
        @Override
        public void onNext(T t) {
            exec.submit(() -> o.onNext(t));
        }
        
        @Override
        public void onError(Throwable e) {
            exec.submit(() -> o.onError(e));
        }

        @Override
        public void onCompleted() {
            exec.submit(() -> o.onCompleted());
        }            
    });
};

This pattern only works if the executor is single threaded or otherwise ensures FIFO behavior and doesn't execute multiple tasks from the same "client" at the same time.

Unsubscription here is more complicated, because one has to keep track all the pending tasks, remove them when they have finished and make sure every pending task can be mass-cancelled.

I believe the Rx.NET has some complicated machinery for this, but luckily, RxJava has a simple solution in the form of the Scheduler.Worker, taking care of all the required unsubscription behavior:


Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);

    source.unsafeSubscribe(new Subscriber<T>(subscriber) {
        @Override
        public void onNext(T t) {
            worker.schedule(() -> subscriber.onNext(t));
        }
        
        @Override
        public void onError(Throwable e) {
            worker.schedule(() -> subscriber.onError(e));
        }

        @Override
        public void onCompleted() {
            worker.schedule(() -> subscriber.onCompleted());
        }            
    });
});


Now if we compare subscribeOn and observeOn, one can see that subscribeOn schedules the entire source.subscribe(...) part whereas observeOn schedules the individual subscriber.onXXX() calls onto another thread.

You can now see if observeOn is applied twice, that inner scheduled task expands to another lever of scheduling:

worker.schedule(() -> worker2.schedule(() -> subscriber.onNext(t)));

thus it overrides the emission thread in the chain, therefore, functionally, the closest observeOn to the consumer will win. From the expanded call above, you can see that worker is now wasted as a resource while providing no functional value to the sequence.

The observeOn with the given structure has a drawback. If the source is some trivial Observable such as range(0, 1M); that will emit all of its values and suddenly, we have a large amount of pending task in the underlying threadpool of the scheduler. This can overwhelm the downstream consumer and also consumes lot of memory.

Backpressure was introduced mostly to handle such cases, preventing internal buffer bloat and unbounded memory usage due to an asynchronous boundary. Consumers specifying the items they can consume via request() makes sure the producer side will only emit that many elements onNext(). Once the consumer is ready, it will issue another request(). The observeOn() above, with its new Subscriber<T>(subscriber) wrapping, already composes backpressure and relays the request() calls to the upstream source. However, this doesn't prevent the consumer from requesting everything via Long.MAX_VALUE and now we have the same bloat problem again.

Unfortunately, RxJava discovered the backpressure problem too late and the mandatory requesting would have required a lot of user code changes. Instead, backpressure was introduced as an optional behavior and made the resposibility of the operators such as observeOn to handle it while maintaining transparency with bounded Subscribers and unbounded Observers alike.

The way it can be handled is via a queue, request tracking for the child Subscriber, fixed request amount towards the source and a queue-drain loop.


Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);

    source.unsafeSubscribe(new Subscriber<T>(subscriber) {
        final Queue<T> queue = new SpscAtomicArrayQueue<T>(128);

        final AtomicLong requested = new AtomicLong();

        final AtomicInteger wip = new AtomicInteger();

        Producer p;

        volatile boolean done;
        Throwable error;

        @Override
        public void onNext(T t) {
            queue.offer(t);
            trySchedule();
        }
        
        @Override
        public void onError(Throwable e) {
            error = e;
            done = true;
            trySchedule();
        }

        @Override
        public void onCompleted() {
            done = true;
            trySchedule();
        }            
        
        @Override
        public void setProducer(Producer p) {
            this.p = p;
            subscriber.setProducer(n -> {
                BackpressureUtils.addAndGetRequested(requested, n);
                trySchedule();
            }); 
            p.request(128);
        }

        void trySchedule() {
            if (wip.getAndIncrement() == 0) {
                worker.schedule(this::drain);
            }
        }

        void drain() {
            int missed = 1;
            for (;;) {
                long r = requested.get();
                long e = 0L;

                while (e != r) {
                    boolean d = done;
                    T v = queue.poll();
                    boolean empty = v == null;
                    
                    if (checkTerminated(d, empty)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    subscriber.onNext(v);
                    e++;
                }

                if (e == r && checkTerminated(done, queue.isEmpty())) {
                    break;
                }

                if (e != 0) {
                    BackpressureHelper.produced(requested, e);
                    p.request(e);
                }

                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        boolean checkTerminated(boolean d, boolean empty) {
            if (subscriber.isUnsubscribed()) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (e != null) {
                    subscriber.onError(e);
                    return true;
                } else
                if (empty) {
                    subscriber.onCompleted();
                    return true;
                }
            }
            return false;
       }
    });
});

By now, the pattern should be quite familiar. We queue up the item or save the exception, then increment the wip counter and schedule the draining of the queue. This is necessary as values may arrive the same time the downstream issues a request. Issuing a request has to schedule the drain as well because values may be available in the queue already. The drain loop emits what it can and asks for replenishment from the upstream Producer it got through the setProducer() call.

Naturally, one can extend this with additional safeguards, error-delay capability, parametric initial request amount and even stable replenishment amounts. This trySchedule setup has the property that it doesn't require a single threaded scheduler to begin with as it self-trampolines: due to the getAndIncrement, only a single thread will issue the drain task at a time and then only when the wip counter is decremented to zero will open the opportunity for scheduling another drain task by somebody.


Conclusion


In this post, I've tried to clear up the confusion around the subscribeOn and observeOn operators by showing a simplified, clutter free way of implementing them.

We saw then that the complication in RxJava comes from the need for handling backpressure somewhat transparently for consumers that do or don't directly drive a sequence through it.

Now that the inner workings and structures have been clarified, let's continue with the discussion about operator fusion where I can now use subscribeOn and observeOn as an example how macro- and micro-fusion can help around the asynchronous boundaries they provide.

2016. március 20., vasárnap

Writing a custom reactive base type

Introduction


From time to time, the question or request comes up that one would really like to have his/her own reactive type. Even though RxJava's Observable has plenty of methods and extension points via lift(), extend() and compose(), one feels the Observable should have the operator xyz() or in some chains, the chain shouldn't allow calling uvw().

The first case, namely adding a new custom method without going through the project as a contribution, is as old as the reactive programming on the JVM. When I first ported Rx.NET to Java, I had to face the same problem because .NET had the very convenient extension method support already back in 2010. Java doesn't have this and the idea has been rejected in the version 8 development era in the favor of default methods with the "justification" that such extension methods can't be overridden. Yes they can't but they can be replaced by another method from another class.

The second case, hiding or removing operators, comes up with custom Observables where certain operations don't make sense. For example, given a ParallelObservable that splits the input sequence into parallel processing pipelines internally, it makes sense to map() or filter() in parallel, but it doesn't make sense to use take() or skip().


Wrapping

Both cases can be solved by writing a custom type and just wrap the Observable into it.

public final class MyObservable<T> {
    private Observable<T> actual;
    public MyObservable<T>(Observable<T> actual) {
        this.actual = actual;
    }
}

Now we can add operators of our liking:

    // ...
    public static <T> MyObservable<T> create(Observable<T> o) {
        return new MyObservable<T>(o);
    }

    public static <T> MyObservable<T> just(T value) {
        return create(Observable.just(value));
    }

    public final MyObservable<T> goAsync() {
        return create(actual.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()));
    }

    public final <R> MyObservable<R> map(Func1<T, R> mapper) {
        return create(actual.map(mapper));
    }

    public final void subscribe(Subscriber<? super T> subscriber) {
        actual.subscribe(subscriber);
    }
    // ...

As seen here, we achieved both goals: get rid of the unnecessary operators and introduce our own operator while staying within our custom type.

If you look at the source code of RxJava, you see the same pattern where the actual object is just the OnSubscribe / Publisher type and the Observable enriches them with all sorts of operators.


Interoperation


The MyObservable looks adequate, but eventually, one has to interoperate with the regular Observable or somebody else's YourObservable. Because these are distinct types, we need a common type they can communicate with each other. Naturally, everybody can implement a toObservable() and return an Observable view, but that's yet another inconvenience of calling the method. Instead, every MyObservable and YourObservable can extend a base class or implement an interface with the minimal set of operations that each requires.

In RxJava 1.x, the obvious choice, Observable, isn't too good, because its methods are final and leak into MyObservable and the worst, they all return Observable instead of MyObservable! Unfortunately, 1.x can't help in this regard due to binary compatibility reasons.

Lucky thing is that in 2.x, Observable (Flowable) isn't really the root of the reactive type but Publisher. Every observable is a Publisher and many operators take Publisher as parameter instead of Observable. This has the benefit of working with other Publisher-based types out of box. The reason this can work is that for the Observable chain to work, the operators only need a single method to be available from their sources: subscribe(Subscriber<? super T> s);

Therefore, if we target 2.x, the MyObservable should implement Publisher and thus immediately available as source to operators of any decent reactive library:

public class MyObservable<T> implements Publisher<T> {
    // ...
    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        actual.subscribe(subscriber);
    }
    // ...
}


Extension


Given this MyObservable, one would eventually want to have other custom reactive type for different use cases, but that will become tedious as well due to the need for duplicating operators all over. Naturally, one thinks about using MyObservable as the base class for TheirObservable and adding new operators there, but that suffers from the same problem as the Observable -> MyObservable would: operators return the wrong type.

I believe the Java 8 Streams API suffered from a similar problem and if you look at the signature, Stream extends BaseStream<T, Stream<T>> and BaseStream<T, S extends BaseStream<T, S>>. Quite odd that some supertype has a type parameter for the subtype. The reason for this is to capture the subtype in the type signature of the methods, thus if you have MyStream, all stream methods' type signature now has MyStream as a return type.

We can achieve a similar structure by declaring MyObservable as follows:

    public class MyObservable<T, S extends MyObservable<T, S>> implements Publisher<T> {
       
        final Publisher<? extends T> actual;
       
        public MyObservable(Publisher<? extends T> actual) {
            this.actual = actual;
        }
       
        @SuppressWarnings("unchecked")
        public <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
            return (U)new MyObservable<R, S>(my);
        }
       
        public final <R, U extends MyObservable<R, U>> U map(Function<? super T, ? extends R> mapper) {
            return wrap(Flowable.fromPublisher(actual).map(mapper));
        }
       
        @Override
        public void subscribe(Subscriber<? super T> s) {
            actual.subscribe(s);
        }
    }

Quite a set of generic type mangling. We specify a wrap() method that turns an arbitrary Publisher into MyObservable and we call it from map() to ensure the result type is ours. Descendants of MyObservable will then override wrap to provide their own type:

    public class TheirObservable<T> extends MyObservable<T, TheirObservable<T>> {
        public TheirObservable(Publisher<? extends T> actual) {
            super(actual);
        }

        @SuppressWarnings("unchecked")
        @Override
        public <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
            return (U) new TheirObservable<R>(my);
        }
    }

Let's try it:

    public static void main(String[] args) {
        TheirObservable<Integer> their = new TheirObservable<>(Flowable.just(1));
       
        TheirObservable<String> out = their.map(v -> v.toString());

        Flowable.fromPublisher(out).subscribe(System.out::println);
    }


It works as expected; no compilation errors and it prints the number 1 to the console.

Now let's add a take() operator to TheirObservable:

        @SuppressWarnings({ "rawtypes", "unchecked" })
        public <U extends AllObservable<T>> U filter(Predicate<? super T> predicate) {
            Flowable<T> p = Flowable.fromPublisher(actual);
            Flowable<T> u = p.filter(predicate);
            return (U)(AllObservable)wrap(u);
        }

The method signatures get more complicated and the type system starts to fight back; one needs raw types and casts to make things appear the expected type. In addition, if one writes their.map(v -> v.toString()).take(1); the compiler won't find take(). The reason for it is that map returns something of MyObservable which something was defined by the assignment to TheirObservable. To make the types work out, we have to split the fluent calls into individual steps:

        TheirObservable<Integer> their2 = new TheirObservable<>(Flowable.just(1));
        TheirObservable<String> step1 = their2.map(v -> v.toString());
        TheirObservable<String> step2 = step1.take(1);
        Flowable.fromPublisher(step2).subscribe(System.out::println);


Finally, lets extend TheirObservable further into AllObservable and let's add the filter() method:


    public static class AllObservable<T> extends TheirObservable<T> {
        public AllObservable(Publisher<? extends T> actual) {
            super(actual);
        }

        @Override
        <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
            return (U)new AllObservable<R>(my);
        }
        
        @SuppressWarnings({ "rawtypes", "unchecked" })
        public <U extends AllObservable<T>> U filter(Predicate<? super T> predicate) {
            Flowable<T> p = Flowable.fromPublisher(actual);
            Flowable<T> u = p.filter(predicate);
            return (U)(AllObservable)wrap(u);
        }
    }

then use it:


        AllObservable<Integer> all = new AllObservable<>(Flowable.just(1));
        
        AllObservable<String> step1 = all.map(v -> v.toString());

        AllObservable<String> step2 = step1.take(1);
        
        AllObservable<String> step3 = step2.filter(v -> true);

        Flowable.fromPublisher(step3).subscribe(System.out::println);

Unfortunately, this doesn't compile because map() doesn't return AllObservable, namely, AllObservable is not MyObservable<String, U extends MyObservable<String, U>>. Changing step1's type to TheirObservable<String> resolves the compilation issue. However, if one would then swap filter() and take(), step1 no longer is an AllObservable and filter() is no longer available.


Conclusion


Can we fix the situation with AllObservable? I don't know; this is where my understanding of Java's type system and type inference ends.

Will RxJava 2.x have such structure then? If it were up to me then no. To support this style, we'd need wrapping all the time despite I want to get rid of all lift() and create() use and the type signatures of classes and methods end up way more complicated than before.

Therefore, if one wants to go down this path, the example shows above that RxJava's API doesn't have to change and can be wrapped to do the work while one specifies their surface API at will. It is a good example for "composition over inheritance".

2016. március 11., péntek

Operator-fusion (Part 1)

Introduction

Operator-fusion, one of the cutting-edge research topics in the reactive programming world, is the aim to have two of more subsequent operators combined in a way that reduces overhead (time, memory) of the dataflow.

(Other cutting-edge topics are: 1) reactive IO, 2) more native parallel async sequences and 3) transparent remote queries.)

The key insight with operator-fusion is threefold:

  1. many sequences are started from constant or quasi-constant sources such as just(), from(T[]), from(Iterable), fromCallable() which don't really need the thread-safety dance in a sequence of operators,
  2. some pairs of operators can share internal components such as Queues and
  3. some operators can tell if they consumed the value or dropped it, avoiding request(1) call overhead.

In this mini-series, I'll describe the hows and whys of operator-fusion, as we currently understand it. By "we", I mean the joint research effort on optimizing Reactive-Streams operators beyond what's there in RxJava 2.x and has been in previous versions of Project Reactor.

The experimentation happens in the reactive-streams-commons, Rsc for short, GitHub repository. The results of the Rsc is now driving Project Reactor 2.5 (currently in milestone 2) and verified by a large user base. Hopefully, RxJava can benefit from the results as well (but maybe not before 3.x).

If you are following Akka-Streams, you might have read/head about operator-fusion there as well. As far as I could understand their approach, their objective is to make sure more stages of the pipeline run on the same Actor, avoiding the previous, very likely, thread-hopping with their sequences. Essentially, there is now a mode where the developer can define the async boundaries in the pipeline. Does this sound familiar? From day 1, Rx-based libraries let you do this.

Generations

Reactive libraries and associated concepts evolved over time. What we had 7 years ago in Rx.NET, requirements and implementation-wise is significantly different what we'll have tomorrow with libraries such as Project Reactor.

With my experience with the history of "modern" reactive programming, I categorize the libraries into generations.

0th generation

The very first generation of reactive programming tools mainly consist of java.util.Observable API and its cousins in other languages and almost any callback-based API such as addXXXListener in Swing/AWT/Android.

The Observable API was most likely derived from the Gang-of-four design patterns book (or the other way around, who knows) and has the drawback of being inconvenient to use and non-composable. In today's terms, it is a limited PublishSubject where you have only one stage: publisher-subscriber.

The  addXXXListener style of APIs suffer, although facilitate push-based eventing, from composability deficiencies. The lack of common base concept would require you to implement a composable library for each of them one-by-one; or have one common abstraction like RxJava and build adapter for each addXXXListener/removeXXXListener entry point.

1st generation

Once the deficiencies were recognized and addressed by Erik Meijer & Team at Microsoft, the first generation of reactive programming libraries were born: Rx.NET around 2010, Reactive4Java in 2011 and early versions of RxJava in 2013.

The others followed the Rx.NET architecture closely, but soon turned out there are problems with this architecture. When the original IObservable/IObserver is implemented with purely same-thread manner, the sequences can't be cancelled in progress with operators such as take(). Rx.NET sidestepped the issue by using mandatory asnycrony in sources such as range().

The second problem was the case when the producer side is separated by an implicit or explicit asynchronous boundary from a consumer that can't do its job fast enough. This can happen with trivial consumers as well because of the infrastructure overhead of crossing the asynchronous boundary. This is what we call the backpressure problem.

2nd generation

The new deficiencies of synchronous cancellation and the lack of backpressure were recognized by the RxJava team (I wasn't really involved) and a new architecture has been designed.

The class Subscriber was introduced which could tell if it was interested in more events or not via isUnsubscribed() that had to be checked by each source or operator emitting events.

The backpressure problem was addressed by using co-routines to signal the amount of items a Subscriber can process at a time through a Producer interface.

The third addition was the method lift() which allows a functional transformation between Subscribers directly. Almost all instance operators have been rewritten to run with lift() through the new Operator interface.

3rd generation

Apart from being clumsy and limiting some optimizations, the problem with RxJava's solution was that it was incompatible with the viewpoints of other (upcoming) reactive libraries at the time. Recognizing the advent of (backpressure enabled) reactive programming, engineers from various companies got together and created the Reactive-Streams specification. The main output is a set of 4 interfaces and 30 rules regarding them and their 7 total methods.

The Reactive-Streams specification allows library implementors to be compatible with each other and compose the sequences, cancellation and backpressure across library boundaries while allowing the end-user to switch between implementations at will.

Reactive-Streams, and thus 3rd generation, libraries are, for example, RxJava 2.x, Project Reactor and Akka-Streams.

4th generation

Implementing a fluent library on top of Reactive-Streams requires quite a different internal architecture, thus RxJava 2.x had to be rewritten from scratch. While I was doing this reimplementation, I recognized some operators could be combined in an external or internal fashion, saving on various overheads such as queueing, concurrency-atomics and requesting more.

Since RxJava 2.x development crawled to halt due to lack of serious interest from certain parties, I set RxJava 2.x aside until Stephane Maldini (one of the contributors to Reactive-Streams and main contributor to Project Reactor) and I started talking about a set of foundational operators that both RxJava 2.x and Project Reactor 2.5+ (and eventually Akka-Streams) could use and incorporate them into the respective libraries.

With active communication, we established the reactive-streams-commons library, built the foundational operators and designed the components of optimizations that we call now operator-fusion.

Thus, a 4th generation reactive library may look like a 3rd generation from the outside, but the internals of many operators change significantly to support overhead reduction even further.

5+ generation

I think, at this point, we are at half point in what operator-fusion can achieve, but there are signs the architecture of Reactive-Streams will need extensions to support reactive IO operations in the form of bi-directional sequences (or channels). In addition, transparent remote reactive queries may require changes as well (see QBservable in Rx.NET). I don't see the full extent of possibilities and requirements at this point and all is open for discussion.


The Rx lifecycle

Before jumping into operator-fusion, I'd like to define the major points (thus the terminology I'll be using) of the lifecycle of an Rx sequence. This applies to any version of RxJava and any Reactive-Streams based libraries as well.

The lifecycle can be split into 3 main points:

  1. Assembly-time. This is the time when you write up just().subscribeOn().map() and assign that to a field or variable of type Observable/Publisher. This is the main difference between Future-based APIs (Promise, CompletableFuture, etc.) which if support some fluent API where there isn't a separate assembly time but some form of interleaving among the 3 points.
  2. Subscription-time. This is the time when a Subscriber subscribes to a sequence at its very end and triggers a "storm" of subscriptions inside the various operators. On one hand, it has an upstream-directed edge and on the other hand, a downstream-directed edge of calls to setProducer/onSubscribe. This is when subscription-sideeffects are triggered and generally no value is flowing through the pipeline.
  3. Runtime. This is the time when items are generated followed by zero or one terminal event of error/completion.

Each distinct point in the lifecycle enables a different set of optimization possibilities.


Operator-fusion

I admit, I took the term operator-fusion from some Intel CPU documentation describing their internal architecture doing macro- and micro-fusions on assembly-level operators. It kinda sounded cool and the concepts behind it could be expanded up the language level and reach the operators of reactive dataflows.

The idea, on the reactive level, is to modify the sequence the user created at various lifecycle points to remove overhead mandated by the general architecture of the reactive library.

As with the assembly-level fusion, we can define two kinds of reactive operator-fusion.

Macro-fusion

Macro-fusion happens mainly in the assembly-time in the form of replacing two or more subsequent operators with a single operator, thus reducing the subscription-time overhead (and sometimes the runtime overhead in case the JIT would be overwhelmed) of the sequence. There are several ways this can happen.

1) Replacing an operator with another operator

In this form of fusion, the operator applied looks at the upstream source (this is why I mentioned lift() causes trouble) and instead of instantiating its own implementation, it calls/instantiates a different operator.

One example of this is when you try to amb()/concat()/merge() an array of sources which has only one element. In this case, it would be unnecessary to instantiate the implementation and one can avoid the overhead by returning that single element directly. This kind of optimization is already part of RxJava 1.x.

The second example is when one uses a constant source, such as range() and applies subscribeOn(). However, there is little-to-no behavioral difference between applying observeOn() in the same situation. Thus subscribeOn() detecting a range() can switch to observeOn() and perhaps benefit from other optimizations that observeOn() itself can provide.

2) Replacing an operator with a custom operator

The exist operator-pairs that come up often and may work better if they were combined into a single operator. A very common operator-pair that is used for jump-starting some asynchronous computation is just().subscribeOn() or the equivalent just().observeOn().

Such sequences have quite a large overhead compared to the single value they emit: internal queues get created, workers get instantiated and released, several atomic variables are modified.

Therefore, replacing the pair with a custom operator that combines the scheduling and emission into a single value into one single operator is a win.

This approach, especially involving just(), can be extended to other operators, such as flatMap() where all the internal complexities can be avoided by invoking the mapper function once and running with the single Observable/Publisher directly, without buffering or extra synchronization.

Again, RxJava 1.x already has optimizations such as these examples above.

3) Replacing during subscription-time

There are cases when the previous two cases may happen during subscription-time instead of assembly-time.

I can see two reasons for moving the optimization into the subscription-time: 1) safety-net in case the fluent API is bypassed and 2) convenience if the fused and non-fused version doesn't differ that much to warrant a full-independed class as operator.

4) Replacing with the same operator but with modified parameters

Users of the libraries tend to apply certain operator types multiple times in a sequence, such as map() and filter():

Observable.range(1, 10)
   .filter(v -> v % 3 == 0)
   .filter(v -> v % 2 == 0)
   .map(v -> v + 1)
   .map(v -> v * v)
   .subscribe(System.out::println);

This is quite convenient to look at one can more easily understand what's happening. Unfortunately, if you have a range of 1M or resubscribe to the sequence a million times, the structure has quite a measurable overhead compared to a flatter structure.

The idea with this macro-fusion is to detect if an operator of the same type was applied before, take the original source and apply the operator where the parameters get combined. In our example, that means range() is followed, internally, by a single filter() application where the two lambda functions (in their reference form) are combined:

Predicate<Integer> p1 = v -> v % 3 == 0;
Predicate<Integer> p2 = v -> v % 2 == 0;

Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);

A similar fusion happens with the lambda of the map() operations, with the difference that the output of the first lambda is going to be the input of the second lambda:

Function<Integer, Integer> f1 = v -> v + 1;
Function<Integer, Integer> f2 = v -> v * v;

Function<Integer, Integer> f3 = v -> f2.apply(f1.apply(v));

Micro-fusion

Micro-fusion happens when two or more operators share their resources or internal structures and thus bypassing some overhead of the general wired-up structure. Micro-fusion can mostly happen in subscription-time.

The original idea of micro-fusion was the recognition that operators that end in an output queue and operators starting with a front-queue could share the same Queue instance, saving on allocation and saving on the drain-loop work-in-progress serialization atomics. Later, the concept has been extended to sources that could pose as Queues and thus avoiding the creation of SpscArrayQueue instances completely.

There are several forms of micro-fusion that can happen in operators.

1) Conditional Subscriber

When filtering an (upstream) source with filter() or distinct(), if that source features a drain-loop with request accounting, there is the likely scenario that filter() will request(1) if the last value has been dropped by the operator. Lots of request(1) calls, which all trigger some atomic increment or CAS loop adds up overhead relatively quickly.

The idea behind a conditional subscriber is to have an extra method, boolean onNextIf(T v), that would indicate if it didn't really consume the value. In that case, the usual drain-loop would then skip incrementing its emission counter and keep emitting until the request limit is reached by successful consumptions.

This saves a lot on request management overhead and some operators in RxJava 2.x support it, but there are some drawbacks as well, mostly affecting the library writers themselves:

a) The source and filter may be separated by other operators so those operators have to offer a conditional Subscriber version of themselves to pass along the onNextIf() calls.

b) By returning non-void, the onNextIf() implementation is forced to be synchronous in nature. However, since it just returns a boolean, it can still behave as the regular onNext() method by claiming it consumed the value even though it dropped it; therefore, it has to request(1) manually again.

Since this is an internal affair, conditional Subscribers of operators still have to implement the regular onNext() behavior in case the upstream doesn't support conditional emission and/or is from some other reactive library with different internals.

2) Synchronous-fusion

We call synchronous micro-fusion the cases when the source to an operator is synchronous in nature, and can pretend to be a Queue itself.

Typical sources of such nature are range()fromIterable, fromArray, fromStream and fromCallable. You could count just() here as well but usually, it is involved more in macro-fusion cases.

Operators that use an internal queues are, for example, observeOn(), flatMap() in its inner sources, publish(), zip(), etc.

The idea is for the source's Subscription to also implement Queue, and during the subscription time, the onSubscribe() can check for it and use it instead of newing up its internal Queue implementation.

This requires a different operation mode (a mode switch) from both upstream and the operator itself, namely, calling request() is forbidded and one has to remember the mode itself in some field variable. In addition, when the Queue.poll() returns null, that should indicate no more values will ever come, unlike regular poll()s in operators where null means no values available but there could be in the future.

Unfortunately for the RxJava 1.x, this fusion works better with the Reactive-Streams architecture because a) setting a Producer is optional, b) the lifecycle-related behaviors are too unreliable and c) discovery difficulties and too much indirection.

When benchmarked in Rsc, this form of fusion makes a range().observeOn() sequence go from 55M Ops/s to 200M Ops/s in throughput, giving a ~4x overhead reduction in this trivial sequence.

Again, there are downsides of this kind of API "hacking":

a) In short sequences, the mode switch inside the operator may not be worth it.

b) This optimization is library local at the moment so unless there is a standard API like with Reactive-Streams interfaces, library A implementing micro-fusion may not cross-fuse with library B.

c) There are situations where this queue-fusion optimization is invalid, mainly due to thread-boundary violations (or other effects we haven't discovered yet that create invalid fused sequences).

d) This optimization has also some library-spanning effect, because intermediate operators have to support, or at least not interfere with the setup protocol of the mode-switch.

e) This also has the effect that in a Reactive-Streams architecture, an operator can't just pass along the Subscription from upstream to its downstream because if they fuse, the intermediate operator is cut out.


3) Asynchronous-fusion

There are other situations when the source has its own internal, downstream facing queue which is drained by requests, but the timing and count of the items are not known upfront.

In this situation, the source can also implement the Queue interface and the operator use it instead of a fresh queue, but the protocol has to change, especially if the same operator wants to support synchronous fusion.

Therefore, in Rsc, instead of checking if Subscription implements Queue received in onSubscribe(), we established a custom interface, QueueSubscription, that implements Subscription, Queue and a method called requestFusion().

The method requestFusion() takes a int-flag telling the upstream what kind of fusion the the current operator wants or supports and the upstream should respond what kind of fusion mode it has activated.

For example, flatMap() would request a synchronous fusion from the inner source which could answer with, sorry-no, yes-synchronous or instead-asynchronous mode and act according to them. Generally, one can "downgrade" from a synchronous mode to asynchronous or none, but one can't "upgrade" to synchronous mode from asychronous mode requests.

In asynchronous-fusion mode, downstream has to still issue request() calls, but instead of enqueueing the value twice, the value gets generated into the shared queue and the upstream calls onNext() indicating its availability. The value of this call is irrelevant, we use null as a type-neutral value, and can trigger the usual drain() call directly.

Since fusion happens in subscription time, there is too late to change the Subscriber instance itself, therefore, one needs a mode flag in the operator and do a conditional check for the fusion mode. Therefore, the same class can work with regular and fuseable sources alike.

This is the point when the complexity rises 50% above the complexity of a classical backpressured operator and requires quite an in-detail knowledge of all the operators and their behavior in various situations.

Invalid fusions

Before one goes ahead and fuses every queue in every operation, a problem comes up in the form of invalid fusion.

Operators tend to have some barriers associated with them. These are somewhat analogous to memory barriers and have a similar effect: 1) prevent certain reorderings and 2) prevent certain optimizations altogether.

For example, mapping from String to Integer and then Integer to Double can't be reordered because of the type mismatch. Reordering a filter() with map() may be invalid when the map changes types or by introducing side-effects in map that would have been avoided because filter didn't let the causing value through in the first place.

On one hand, these functional barriers mainly affect the macro-fusion operators and somewhat easier the detect and understand.

On the other hand, when asynchrony is involved, in the form of a thread-jumping behavior provided by observeOn(), micro-fusion can become invalid.

For example, if you have a sequence of

source.flatMap(u -> range(1, 2).map(v -> heavyComputation(v))
    .observeOn(AndroidSchedulers.mainThread()))
.subscribe(...)

The inner sequence of range-map-observeOn-flatMap would have a single fused queue, where the map()'s behavior has been reordered to the output side of the shared queue, now executes the heavy computation on the main thread.

On a side note, classical observeOn can also drag the emission to its thread due to how backpressure triggers emission, thus in the example above, if you have a longer range(), the range's emission and so the map()'s computation would end up on the main thread anyway. This is why one needs subscribeOn()/observeOn() before map to ensure it runs on the correct thread.

This required a slight change to the protocol of the requestFusion() call by introducing a bit indicating if the caller (chain) acts as an asynchronous boundary, that is, the endpoint of the fused queue would be in another thread. Intermediate operators such as map() intercept this method all and simply respond with no-fusion.

Finally, there might be a subscription-time related barrier as well that prevents reordering/optimization due to subscription side-effects. We are not sure of this yet but here are a few hands-on cases that requires further study:

1) Is it valid to turn a range().subscribeOn(s1).observeOn(s2) chain, which I call strongly-pipelined sequence because of the forced thread-boundary switch by default, into a fused range().observeOn(s2)? The tail-emission pattern is the same, you get events on Scheduler s2, but now we've lost the strong pipelining effect.

2) Subscribing to a Subject may take some in case there are lots of Subscribers there thus subscribeOn() may be a valid use to offset the overhead, but generally, there are no other side-effects happening when one subscribes to a PublishSubject. Is it valid to drop/replace subscribeOn() here?


Conclusion

Operator-fusion is a great opportunity, but also a great responsibility, to reduce overhead in reactive dataflows, and sometimes, get pretty close (+50% overhead with Project Reactor 2.5 M1 instead of +200% overhead with RxJava 2.x) to a regular Java Streams sequence's overhead while still supporting asynchronous (parts of) sequences with the same API (and similar internals).

However, adding fusion to every operator over zealously may not worth it and one should focus on operators doing the heavy lifting in user's code most of the time: flatMap(), observeOn(), zip(), just(), from() etc. In addition, one could say every operator pair is macro-fuseable because a custom operator can be written for it, but then you now have a combinatorial explosion of operators that now have to interact with the regular operators and with each other.

Of course, on the other side, there are operators that don't look like they could be (micro-) fused but may turn up fuseable after all. But instead of building a huge operator cross-fusion matrix, there might be a possibility to automatically discover which operators can be fused by modelling them and the sequences in some way and applying graph algorithms on the network - a topic for further research.

Anyway, the in the next part, I'll dive deeper into how operator-fusion in Rsc has been implemented, but before that, I'd like to describe the in-depth technicalities and differences of subscribeOn() and observeOn() operators in an intermediate post for two reasons:

1) I think showing how to implement them clears up the confusion around them because I learned about subscribeOn() and observeOn()the same in-depth technical way in the first place (and I was never confused).

2) Knowing their structure and exact behavior helps in understanding the fusion-related changes applied to them later on.

As for where you can play with this fusion thing (as an end-user), check out Project Reactor 2.5, who have extensively (unit-) tested the solutions I have described in the post. Of course, since this is an ongoing research, the Rsc project itself welcomes feedback or tips on what operator combinations we should optimize for.

2016. március 5., szombat

RxJava design retrospect

Intoduction


RxJava is now out more than 3 years and lived through several significant version changes. In this blog post, I'll point out design and implementation decisions that I personally think wasn't such a good idea.

Don't get me wrong, it doesn't mean that RxJava is bad or I knew all along how to do it "properly". It was a learning process for all of us involved, but the question is, can we learn from those mistakes and do it better in the next major version?


Synchronous unsubscription


In the early days, RxJava mirrored the architecture of Rx.NET which consisted of two important interfaces, IObservable and IObserver, derived through dualizing the IEnumerable and IEnumerator. (This was also true for my own library, Reactive4Java).

If we look at IObservable, we find the subscribe() method that returns an IDisposable. This returned object allows one to dispose or cancel a running sequence. However, it has a critical problem I demonstrate with a minimalistic reactive program:

interface IDisposable {
    void dispose();
}

interface IObserver<T> {
   void onNext(T t);
}
interface IObservable<T> {
    IDisposable subscribe(IObserver<T> observer);
}  


IObservable<Integer> source = o -> {
   for (int i = 0; i < Integer.MAX_VALUE; i++) {
       o.onNext(i);
   }

   return () -> { };
};

IDisposable d = o.subscribe(System.out::println);
d.dispose();

If we run this code, it starts to print a lot of numbers to the console, despite we called dispose on the returned object by the subscribe method. What's wrong?

The problem is that the source observable can only return its IDisposable object only after the for-loop finishes, but then it has nothing to do. The whole setup is synchronous and thus this structure can't be reasonably cancelled.

Although Rx is good at async processing, many steps in a typical pipeline is synchronous and is affected by this synchronous cancellation requirement. Since Rx.NET is at least 3 years older than RxJava, how could this shortcoming still be in today's Rx.NET?

The example code above is just the well known range() operator and if we run a similar code in C#, we find that it doesn't print or stops printing almost immediately. The secret is that Rx.NET's range() operator runs on async scheduler by default, so that for loop runs on a different thread and the operator can immediately return with a meaningful IDisposable. Therefore, the synchronous cancellation issue is averted, but I wonder, was it a conscious or unconscious decision to sidestep the underlying problem? Who knows.

If you look at the source code of Rx.NET's range, you'll find something more complicated. It uses a recursive scheduling technique to deliver each value to the observer. When I measured it, it could only sustain 1M ops/second on the same machine which could do 250M ops/second with RxJava while delivering 1M elements.

Now RxJava's range() never used a scheduler thus the synchronous cancellation problem was discovered and mitigated by introducing the Subscriber class. An instance can be checked if it still wants events or not. The example above can be rewritten so the for loop checks its subscriber and quits accordingly.


Observable<Integer> source = Observable.create(s -> {
   for (int i = 0; i < Integer.MAX_VALUE && !s.isUnsubscribed(); i++) {
       o.onNext(i);
   }
});

Subscription d = o.subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer v) {
        System.out.println(v);
        unsubscribe();
    }

    // ...
});

The Subscriber acts like a take(1) and unsubscribes itself after the first item. This unsubscribe() call internally sets a volatile boolean flag that is read by isUnsubscribed() and the loop above is stopped. Note, however, that you still can't unsubscribe via the Subscription returned by subscribe() because the lambda with the for loop doesn't exit until its terminal condition is met.

It doesn't seem to solve our initial problem that well, does it? Luckily, this new structure has the property that it can be cancelled before or while the loop is running, the latter done from another thread of course:


Subscriber<Integer> s = new Subscriber<Integer>() {
    @Override
    public void onNext(Integer v) {
        System.out.println();
    }
    // ...
}

Schedulers.computation().schedule(s::unsubscribe, 1, TimeUnit.SECONDS);

source.subscribe(s);

The fact that you can basically inject the cancellation support upfront, before anything is even subscribed to, allows proper propagation of unsubscription even with the most complicated operators.

In addition, there is a deeper implication of this new structure. In the new Observable, the lambda doesn't return anything, but Observable.subscribe() does still return a Subscription which is practically the same Subscriber sent in as the parameter. (Technically, it is a bit more involved process; see Jake Wharton's excellent video talk on the subject).

The insight: you can't be fully reactive if you return something. Returning something implies synchronous behavior and the method has to provide some result, even though it can't at that moment. This is when one is forced to block or sleep until the real logic can produce the relevant object to be returned. I showed this in my post about the OSGi Asynchronous Event Streams initiative.


Resources of the Subscriber

The Subscriber class offers the ability to associate resources with it in the form of Subscription instances. When the operator is cancelled (or terminates), these resources are unsubscribed as well.

This is quite a convenience for operator developers, however, has its own cost: allocation.

Whenever a Subscriber is instantiated with the default constructor, an inner SubscriptionList instance is also always created, whether or not the Subscriber is likely to hold resources or not. In the previous example, range() doesn't need resources thus the SubscriptionList is never really used.

On one hand, there exist many operators that don't manage resources so creating the extra container is wasteful. On the other hand, many operators do use resources and expect this convenience to be present.

In addition, you may recall that Subscriber has a constructor that takes another Subscriber and gives the option to share the underlying SubscriptionList. Certainly, this could help reduce the allocation count, but most operators that use resources themselves can't share the same underlying SubscriptionList as this would allow them to unsubscribe resources downstream (see pitfall #2). Thus, the current Subscriber structure is more of a burden, performance wise, than a win for operator writers.

You may now think, what's wrong with giving convenience tools to operator writers? I agree that operators implemented outside RxJava should get as much help as reasonable, however, I believe internal operators should take the diligence and have efficient implementations to begin with.

I've tried a few times to resolve this problem but given the architecture of 1.x, I have doubts it can be achieved. Fortunately, the Reactive-Streams' architecture and thus RxJava 2.x solves this problem by making the resource management the responsibility of the operators.


Subscriber request

If you look into how Subscriber is implemented, you'll see the protected final request() method. This is a convenience method that makes sure if there is a Producer set via setProducer, the request is forwarded to it or accumulated until one Producer arrives. Basically, this is an inlined producer-arbiter.

One might think the method's implementation gives significant overhead to request management, but JMH benchmarks confirmed they don't really affect the overhead outside a small +/- 3% difference, that may also be due to noise.

The real problem with this method is that it has the same name as Producer.request, making it impossible to implement Producer when one extends Subscriber at the same time.

This has the unfortunate consequence that one usually needs an extra Producer object along with the main Subscriber if the operator does some request-manipulation.

This has the consequence of extra allocation during subscription time which affects GC the most with short-lived sequences. The second property is that it increases the call-stack depth and may prevent some JIT optimization.

Since Subscriber.request() is also part of the public API, it can't be renamed in 1.x to make room for Producer.request().

Again, the solution will come with 2.x: there, since Reactive-Streams Subscriber and Subscription are both interfaces, both can appear the same time, plus, a convenience request() method can be moved into a convenience implementation of Subscriber (i.e., AsyncSubscriber) without affecting the operator internals. (This also means it will be discouraged to use convenience Subscribers within operators.)


Lift

Along with backpressure, the method Observable.lift() is considered by many as the best addition to the library. It lets you step into the subscription process and given a Subscriber from downstream, you can return another Subscriber for upstream that does the business logic for that operator.

It became so popular almost all instance operators of Observable are now using it.

Unfortunately, the convenience has a cost: allocation. For most operators, applying that operator to a sequence incurs 3 object allocations. To show this, let's unroll the application of the map() operator:


public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    OperatorMap<T, R> op = new OperatorMap<T, R>(func);
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> child) {
            Subscriber<? super T> parent = op.call(child);
            Observable.this.unsafeSubscribe(parent);
        }
    });
}

We have 1) the Operator instance, 2) the Observable instance and 3) the OnSubscribe instance for each application.

This may not of concern for direct sequences that use map, but imagine you have this 3 allocation a million times because you happen to flatMap something whose inner Observables have operators applied to them:


Observable.range(1, 1_000_000).flatMap(v -> 
    Observable.just(v).observeOn(Schedulers.computation()).map(v -> v * v))
.subscribe(...);


The lift operator is practically an OnSubscribe instance that captures the upstream Observable and calls Operator.call with the downstream Subscriber. Clearly, one could just implement operators directly with OnSubscribe and have the upstream Observable as a parameter; the total instance sizes wouldn't change much but both the allocation count and stack depth get reduced.

The current lift structure has another adverse effect: it makes operator-fusion difficult to impossible in its current form because 1) it is an anonymous class and one can't discover its upstream Observable and Operator easily, and 2) even if made a named class, the two classes are hidden behind indirection and any discovery process now faces more overhead.

Luckily, the shortcomings mentioned so far can be remedied without affecting the public API, but requires diligence of writing and reviewing thousands of lines of code changes.

Unfortunately, when I implemented RxJava 2.0 developer-preview last September, I did not think of this overhead thus the current 2.x branch still uses lift() extensively.

However, there is light at the end of the tunnel: Project Reactor 2.5 doesn't go down on the lift() path and now has lower overhead than RxJava.


Create


Lately, I'm quite outspoken against Observable.create() and now I think it should be named something more scary so beginners avoid it and look for proper factory methods in Observable that do backpressure and unsubscription properly. I can see it as a tool for demonstating to one's audience how to enter the reactive world, but I'm convinced it should receive less spotlight in those presentations.

Regardless, the problem with create() is that it encourages creating 2 instances per Observable: 1) the Observable instance itself and the OnSubscribe holding the subscription logic.

The approach that one creates an Observable instance with create() was born from the encouragement: "composition over inheritance". From general design perspective, this sounds okay, but one has to note that in Java, composition means object allocation: outer objects and inner objects, and more inner-inner objects.

To avoid all these allocations, the solution would be to make Observable not hold an instance of OnSubscribe by default (but keep create() as the lambda-factory version) and operators (both source and intermediate) should extend Observable directly. All operator methods would still reside in Observable:


public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return new ObservableMap<T, R>(this, func);
}


Thus, without lift() and create(), map() would allocate a single Observable instance per application.

Such change, I believe, wouldn't affect the public API since Observable methods are static or final to begin with and operators would be still a subclass of Observable. The change also would help with operator-fusion because each upstream source can now be directly identified and its parameters exposed without indirection.

Again, Project Reactor 2.5 is ahead of RxJava and doesn't use the create() mechanics. Its operators are implemented extending a base class, Flux, the way suggested above.


Conclusion


Designing and implementing RxJava version was and is a learning process as well with unanticipated effects on complexity and performance.

You may think, why the hassle about structures and allocations that clearly work in their current form? Two reasons: the Cloud and Android/IoT. For the cloud, where billions of events happen, any inefficiency or unnecessary overhead is amplified along with the numbers. You may not easily calculate how much does that range-flatmap example above cost you on your laptop, but Cloud suppliers will make you pay for each second, gigabyte and gigahertz of using their service. For Android and IoT, the resource constraints of the devices and the expectancy of more and more features requires one - eventually - to budget memory usage, GC and battery life.