2016. március 20., vasárnap

Writing a custom reactive base type

Introduction


From time to time, the question or request comes up that one would really like to have his/her own reactive type. Even though RxJava's Observable has plenty of methods and extension points via lift(), extend() and compose(), one feels the Observable should have the operator xyz() or in some chains, the chain shouldn't allow calling uvw().

The first case, namely adding a new custom method without going through the project as a contribution, is as old as the reactive programming on the JVM. When I first ported Rx.NET to Java, I had to face the same problem because .NET had the very convenient extension method support already back in 2010. Java doesn't have this and the idea has been rejected in the version 8 development era in the favor of default methods with the "justification" that such extension methods can't be overridden. Yes they can't but they can be replaced by another method from another class.

The second case, hiding or removing operators, comes up with custom Observables where certain operations don't make sense. For example, given a ParallelObservable that splits the input sequence into parallel processing pipelines internally, it makes sense to map() or filter() in parallel, but it doesn't make sense to use take() or skip().


Wrapping

Both cases can be solved by writing a custom type and just wrap the Observable into it.

public final class MyObservable<T> {
    private Observable<T> actual;
    public MyObservable<T>(Observable<T> actual) {
        this.actual = actual;
    }
}

Now we can add operators of our liking:

    // ...
    public static <T> MyObservable<T> create(Observable<T> o) {
        return new MyObservable<T>(o);
    }

    public static <T> MyObservable<T> just(T value) {
        return create(Observable.just(value));
    }

    public final MyObservable<T> goAsync() {
        return create(actual.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()));
    }

    public final <R> MyObservable<R> map(Func1<T, R> mapper) {
        return create(actual.map(mapper));
    }

    public final void subscribe(Subscriber<? super T> subscriber) {
        actual.subscribe(subscriber);
    }
    // ...

As seen here, we achieved both goals: get rid of the unnecessary operators and introduce our own operator while staying within our custom type.

If you look at the source code of RxJava, you see the same pattern where the actual object is just the OnSubscribe / Publisher type and the Observable enriches them with all sorts of operators.


Interoperation


The MyObservable looks adequate, but eventually, one has to interoperate with the regular Observable or somebody else's YourObservable. Because these are distinct types, we need a common type they can communicate with each other. Naturally, everybody can implement a toObservable() and return an Observable view, but that's yet another inconvenience of calling the method. Instead, every MyObservable and YourObservable can extend a base class or implement an interface with the minimal set of operations that each requires.

In RxJava 1.x, the obvious choice, Observable, isn't too good, because its methods are final and leak into MyObservable and the worst, they all return Observable instead of MyObservable! Unfortunately, 1.x can't help in this regard due to binary compatibility reasons.

Lucky thing is that in 2.x, Observable (Flowable) isn't really the root of the reactive type but Publisher. Every observable is a Publisher and many operators take Publisher as parameter instead of Observable. This has the benefit of working with other Publisher-based types out of box. The reason this can work is that for the Observable chain to work, the operators only need a single method to be available from their sources: subscribe(Subscriber<? super T> s);

Therefore, if we target 2.x, the MyObservable should implement Publisher and thus immediately available as source to operators of any decent reactive library:

public class MyObservable<T> implements Publisher<T> {
    // ...
    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        actual.subscribe(subscriber);
    }
    // ...
}


Extension


Given this MyObservable, one would eventually want to have other custom reactive type for different use cases, but that will become tedious as well due to the need for duplicating operators all over. Naturally, one thinks about using MyObservable as the base class for TheirObservable and adding new operators there, but that suffers from the same problem as the Observable -> MyObservable would: operators return the wrong type.

I believe the Java 8 Streams API suffered from a similar problem and if you look at the signature, Stream extends BaseStream<T, Stream<T>> and BaseStream<T, S extends BaseStream<T, S>>. Quite odd that some supertype has a type parameter for the subtype. The reason for this is to capture the subtype in the type signature of the methods, thus if you have MyStream, all stream methods' type signature now has MyStream as a return type.

We can achieve a similar structure by declaring MyObservable as follows:

    public class MyObservable<T, S extends MyObservable<T, S>> implements Publisher<T> {
       
        final Publisher<? extends T> actual;
       
        public MyObservable(Publisher<? extends T> actual) {
            this.actual = actual;
        }
       
        @SuppressWarnings("unchecked")
        public <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
            return (U)new MyObservable<R, S>(my);
        }
       
        public final <R, U extends MyObservable<R, U>> U map(Function<? super T, ? extends R> mapper) {
            return wrap(Flowable.fromPublisher(actual).map(mapper));
        }
       
        @Override
        public void subscribe(Subscriber<? super T> s) {
            actual.subscribe(s);
        }
    }

Quite a set of generic type mangling. We specify a wrap() method that turns an arbitrary Publisher into MyObservable and we call it from map() to ensure the result type is ours. Descendants of MyObservable will then override wrap to provide their own type:

    public class TheirObservable<T> extends MyObservable<T, TheirObservable<T>> {
        public TheirObservable(Publisher<? extends T> actual) {
            super(actual);
        }

        @SuppressWarnings("unchecked")
        @Override
        public <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
            return (U) new TheirObservable<R>(my);
        }
    }

Let's try it:

    public static void main(String[] args) {
        TheirObservable<Integer> their = new TheirObservable<>(Flowable.just(1));
       
        TheirObservable<String> out = their.map(v -> v.toString());

        Flowable.fromPublisher(out).subscribe(System.out::println);
    }


It works as expected; no compilation errors and it prints the number 1 to the console.

Now let's add a take() operator to TheirObservable:

        @SuppressWarnings({ "rawtypes", "unchecked" })
        public <U extends AllObservable<T>> U take(long n) {
            Flowable<T> p = Flowable.fromPublisher(actual);
            Flowable<T> u = p.take(1);
            return (U)(AllObservable)wrap(u);
        }

The method signatures get more complicated and the type system starts to fight back; one needs raw types and casts to make things appear the expected type. In addition, if one writes their.map(v -> v.toString()).take(1); the compiler won't find take(). The reason for it is that map returns something of MyObservable which something was defined by the assignment to TheirObservable. To make the types work out, we have to split the fluent calls into individual steps:

        TheirObservable<Integer> their2 = new TheirObservable<>(Flowable.just(1));
        TheirObservable<String> step1 = their2.map(v -> v.toString());
        TheirObservable<String> step2 = step1.take(1);
        Flowable.fromPublisher(step2).subscribe(System.out::println);


Finally, lets extend TheirObservable further into AllObservable and let's add the filter() method:


    public static class AllObservable<T> extends TheirObservable<T> {
        public AllObservable(Publisher<? extends T> actual) {
            super(actual);
        }

        @Override
        <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
            return (U)new AllObservable<R>(my);
        }
        
        @SuppressWarnings({ "rawtypes", "unchecked" })
        public <U extends AllObservable<T>> U filter(Predicate<? super T> predicate) {
            Flowable<T> p = Flowable.fromPublisher(actual);
            Flowable<T> u = p.filter(predicate);
            return (U)(AllObservable)wrap(u);
        }
    }

then use it:


        AllObservable<Integer> all = new AllObservable<>(Flowable.just(1));
        
        AllObservable<String> step1 = all.map(v -> v.toString());

        AllObservable<String> step2 = step1.take(1);
        
        AllObservable<String> step3 = step2.filter(v -> true);

        Flowable.fromPublisher(step3).subscribe(System.out::println);

Unfortunately, this doesn't compile because map() doesn't return AllObservable, namely, AllObservable is not MyObservable<String, U extends MyObservable<String, U>>. Changing step1's type to TheirObservable<String> resolves the compilation issue. However, if one would then swap filter() and take(), step1 no longer is an AllObservable and filter() is no longer available.


Conclusion


Can we fix the situation with AllObservable? I don't know; this is where my understanding of Java's type system and type inference ends.

Will RxJava 2.x have such structure then? If it were up to me then no. To support this style, we'd need wrapping all the time despite I want to get rid of all lift() and create() use and the type signatures of classes and methods end up way more complicated than before.

Therefore, if one wants to go down this path, the example shows above that RxJava's API doesn't have to change and can be wrapped to do the work while one specifies their surface API at will. It is a good example for "composition over inheritance".

10 megjegyzés:

  1. Yours are interesting articles about the past and future of rxjava and reactive Java programming in general. With your perspective of rxjava 1.x, 2.x, and Project Reactor, what do you see as the way forward for Java reactive? Which framework would you choose for a greenfields project today?
    Cheers

    VálaszTörlés
    Válaszok
    1. We are at a generation change so I'd recommend any Reactive-Streams compliant library. Project Reactor seems to be the closest to a stable release but it contains only a subset of RxJava's operators. I don't know when RxJava 2 will be released, technically, it was ready half a year ago and there are signs Netflix wants some drastic philosophical changes that could delay it further. If I really needed a library today, I'd chose Reactive-Streams-Commons because I can influence it directly and without gating. For you, I suggest you start out with Project Reactor and if the feature set turns out to be inadequate, switch to RxJava 2. I haven't talked about Akka-Streams for two reasons: 1) it's in Scala and digging through the code in Eclipse is a nightmare, 2) it has mandatory async boundaries everywhere and causes performance hits with mostly sync sequences.

      Törlés
  2. Thanks for the info. I know you are using Reactive-Streams-Commons as an repo of incubating features - is it robust enough for real world use?

    VálaszTörlés
    Válaszok
    1. Reactive-Streams-Commons, Rsc, is a pick what you need library. Reactor-Core picks most of it. However, we don't have any release roadmap for it and we only have the snapshots. I wouldn't recommend using it directly. It is mainly for incubating performance enhancements, not a dumping ground for arbitrary operators one can come up with.

      Törlés
  3. Great, thanks but I could not find take() operator code. After "Now let's add a take() operator to TheirObservable", I only see filter method implementation.

    VálaszTörlés
  4. We solved this with Kotlin. It smoothly integrates into java and allows extend existing classes.

    VálaszTörlés
  5. The quality can be tested with time. Our custom writing service has many years of experience and thousands of satisfied clients. Become one of them and order a high-quality paper with us!
    custom wrtiting

    VálaszTörlés
  6. Hi David, in the `Extension` chapter, you try to showcase adding a `take` operator, but you provide the `filter` operator code, instead of the `take` operator code.

    VálaszTörlés