Introduction
Threading and lifecycle are one of the top concerns when developing applications for the Android platform. UI has to be interacted with on a dedicated thread (main thread) but in order to keep the UI responsible to user input and rendering, blocking or CPU intensive calculations should be kept off it.
In addition, views can get destroyed and recreated in a way that is outside of a given application's control unlike a desktop Swing application. This means background tasks must be stopped and listeners removed to prevent leaking references to now logically dead objects.
RxJava and RxAndroid can help with threading concerns and there are other libraries that tap into the various lifecycle events; in general, this means someone will call dispose() on a particular flow or clear() on a CompositeDisposable to mass-cancel multiples of them.
Having a rich set of transformative and coordinating operators along with support for normal values, errors and finite sequences may be overwhelming compared to a classical Listener-based API. Google's LiveData is one of such classical Listener style APIs but unlike Swing's ActionListener for example, there are explicit requirements that interaction with the LiveData object itself happens on the main thread and signals will be dispatched from the main thread to Observers to it.
LiveData API
Unfortunately, I wasn't able to locate a public repository for the LiveData sources and had to rely on the sources downloaded from Google's Maven repository:
compile "android.arch.lifecycle:reactivestreams:1+"
There is an interoperation library associated with LiveData that allows presenting and consuming events from any Reactive-Streams Publisher. This will transitively import the actual LiveData library. Note that LiveData is currently considered beta and may change arbitrarily before release. That said, I don't think the core structure and premise will actually change.
The main consumer type is the android.arch.lifecycle.Observer with its single onChanged(T) method. Given an arbitrary LiveData instance, one can use observe(LifecycleOwner, Observer) or observeForever(Observer) methods to attach the listener to a source. The onChanged(T) will be called on the main thread. Cancelling an observation requires one to call removeObserver() for which one has to remember both the original LiveData object and the Observer instance itself. All the methods return void. The LifecycleOwner will be listened for destroy events, at which point the Observers are removed from the LiveData instance that depended on that specific source. This is similar to RxBinding's bindToLifecycle() approach composed onto a flow.
The LiveData abstract class doesn't feature any transformative or coordinating operators on itself, but the library features an utility class, Transformations, currently with a map and switchMap operations. Both will execute the transformations and emissions on the main thread, but otherwise they match the behavior of the same RxJava operators.
In theory, one could interpret LiveData as a cold source, but its usage patterns point to a hot source, just like a BehaviorSubject in RxJava; LiveData remembers the last value and emits it first to new consumers. To signal to consumers, LiveData has protected methods setValue and postValue. setValue requires the value to be set on the main thread and postValue will schedule a task to the main thread to call setValue there. To manually invoke these methods, a MutableLiveData object is available where the two methods have been made public.
Possible shortcomings
The main shortcoming I see with LiveData is the ubiquitous "return to main thread" mentality. For example, querying data from a LiveData-enabled datasource and preprocessing it should happen in a background thread as well before the aggregated data is sent back to the main thread for displaying them. Unfortunately, the datasource will notify the preprocessing Observer on the main thread for each resulting item, which Observer then has to schedule tasks on a background thread for all those items. The main thread is involved too early in such flows which wastes time for the main thread.
A second shortcoming could be the lack of terminal event notifications included in the Observer API. Of course, one can define an union-class with the value, error and complete subtypes, just like RxJava's Notification object, to work around this problem. Needless to say, such wrapper will cost allocation and indirection.
Lastly, addListener/removeListener pairs compose relatively poorly and requires the operator to remember both the original LiveData and the Observer that was used when the link was established. An onSubscribe(Disposable) style cancellation support worked out much nicer in RxJava 2. Of course, having a similar method in Observer would break its lambda-friendly Single Abstract Method structure.
Reactive-Streams interoperation
Consuming a LiveData as Publisher
RxJava has infrastructure support to work with such classical Listener-style sources. Specifically, the create() operator on Observable and Flowable show such an example usage in its JavaDoc. Consequently, talking to LiveData is a straightforward routine:
LiveData<String> source = new MutableLiveData<String>(); Flowable<String> f = Flowable.<String>create(emitter -> { Observer<String> o = v -> { emitter.onNext(v); }; source.observeForever(o); emitter.setCancellable(() -> AndroidSchedulers.mainThread() .scheduleDirect(() -> mld.removeObserver(o) ) ); }, BackpressureStrategy.LATEST);
The Reactive-Streams bridge in LiveDataReactiveStreams, of course, has no access to Flowable and has to work out the Subscription management manually. Unfortunately, the toPublisher() implementation is wrong. If there was a public repository, I could contribute a correct one (without using RxJava of course).
Let's see the implementation piece by piece to demonstrate how not to write Publishers. (The file is marked as Apache 2.0 open source, should be fine to quote it here, right?)
public static <T> Publisher<T> toPublisher( final LifecycleOwner lifecycle, final LiveData<T> liveData) { return new Publisher<T>() { boolean mObserving; boolean mCanceled; long mRequested; @Nullable T mLatest; @Override public void subscribe(final Subscriber<T> subscriber) { // continued below } }; }
The first red flag comes from the mutable, non thread-safe state of the anonymous Publisher. Hot sources may have mutable fields but those are protected with proper synchronization primitives (see PublishProcessor for example). However, these fields should be part of the state of the Subscription that is sent to a Subscriber as Publishers are expected to be consumed by any number of Subscribers and interaction with one Subscriber should not affect the interactions with the others.
Of course, a Publisher can chose to only service a single Subscriber (as with UnicastProcessor), but for that, there is no field to let the subscribe() know there is already someone consuming the Publisher.
Next, the Observer instance is defined with a backpressure strategy of keeping the latest if the downstream is not ready.
final Observerobserver = new Observer () { @Override public void onChanged(@Nullable T t) { if (mCanceled) { return; } if (mRequested > 0) { mLatest = null; subscriber.onNext(t); if (mRequested != Long.MAX_VALUE) { mRequested--; } } else { mLatest = t; } } };
Apart from the wrong shared state of the mCanceled, mRequested and mLatest, the code relies on the fact that both the onChanged invocation and the request() call happens on the main thread (see below). Having a per Subscriber atomic mRequested instead would save a trip to the main thread for an increment.
The next segment deals with the request() call from the downstream.
subscriber.onSubscribe(new Subscription() { @Override public void request(final long n) { if (n < 0 || mCanceled) { return; } ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() { @Override public void run() { if (mCanceled) { return; } // Prevent overflowage. mRequested = mRequested + n >= mRequested ? mRequested + n : Long.MAX_VALUE; if (!mObserving) { mObserving = true; liveData.observe(lifecycle, observer); } else if (mLatest != null) { observer.onChanged(mLatest); mLatest = null; } } }); }
The first (smaller) problem is that non-positive requests should be rewarded with an onError(IllegalArgumentException) properly serialized with any onNext calls. Here, it is ignored. In addition, request() may be called from any thread and such call may not see mCanceled properly. The mObserving adds an odd delay to start observing the actual LiveData object. (Since most sequences will issue a request() almost immediately, I'm not sure what benefit this can give, however, there were voices pretty keen on such behavior recently who couldn't understand that to control the first invocation of request(), one would have to consume a Publisher directly with a Subscriber and not have any intermediate operators between the two.)
Lastly, let's see cancel():
@Override public void cancel() { if (mCanceled) { return; } ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() { @Override public void run() { if (mCanceled) { return; } if (mObserving) { liveData.removeObserver(observer); mObserving = false; } mLatest = null; mCanceled = true; } }); }
Since removeObserver has to be called from the main thread (which by itself could be inconvenient), a task is scheduled which will also update mCanceled to true. Since cancel() can be invoked from any thread, a non-volatile mCanceled may not be visible. In addition, since the mCanceled is set to true on the main thread, there could be a significant delay between a cancel() and its effects, letting onChanged() calls still through. Setting a volatile mCanceled true right in the cancel() method could have the stopping effects much earlier.
(In fact, this was a problem with our early unsubscribeOn() where the Reactive-Streams TCK would complain about still receiving too events after it has cancelled the flow. The solution was to set a volatile cancelled flag that would be read by onNext and drop events until the upstream.cancel() got executed asynchronously.)
It is relatively simple to fix the mistakes here: combine Observer and Subscription into one class and move the mXXX fields into this class.
Exposing a Publisher as LiveData
The unfortunate lack of terminal events in LiveData forces us to deal with only onNext calls. There is no way to tell an Observer that no more events will come (unless agreeing upon an event container type) and then get rid of them en-masse (there is no clearObservers()).
With RxJava, this could be done relatively easily:
MutableLiveData<String> mld2 = new MutableLiveData<>(); Disposable d = Flowable.range(1, 5).map(Object::toString) .subscribe(mld2::postValue);
Of course, one has to manage the Disposable as usual and possibly decide, when to subscribe() to a cold source with respect to available Observers of the (Mutable)LiveData. A refCount-like behavior is possible but for that, one has to implement a custom LiveData object.
Apparently, there exist events on the LiveData class, onActive and onInactive, that will be called when the first Observer arrives and the last Observer leaves respectively. The ReactiveStreamsLiveData.fromPublisher() uses them in its implementation:
private static class PublisherLiveData<T> extends LiveData<T> { private WeakReference<Subscription> mSubscriptionRef; private final Publisher mPublisher; private final Object mLock = new Object(); PublisherLiveData(@NonNull final Publisher publisher) { mPublisher = publisher; } // the rest is below }
The mSubscriptionRef holds the current active connection's Subscription and the mLock protects it so that a concurrent call to onSubscribe may not clash with the deactivation of the LiveData object. As we'll see in a short, this is not enough because the race can leave the upstream's Subscription from an old activation not cancelled and thus still posting to the LiveData object.
The onActivate event should subscribe to the source and relay its onNext events:
@Override protected void onActive() { super.onActive(); mPublisher.subscribe(new Subscriber<T>() { @Override public void onSubscribe(Subscription s) { // Don't worry about backpressure. If the stream is too noisy then // backpressure can be handled upstream. synchronized (mLock) { s.request(Long.MAX_VALUE); mSubscriptionRef = new WeakReference<>(s); } } @Override public void onNext(final T t) { postValue(t); } @Override public void onError(Throwable t) { synchronized (mLock) { mSubscriptionRef = null; } // Errors should be handled upstream, so propagate as a crash. throw new RuntimeException(t); } @Override public void onComplete() { synchronized (mLock) { mSubscriptionRef = null; } } }); }
Here, apart from the mSubscriptionRef management, the only small mistake is the call to request() from within the lock itself. A bigger mistake is that onError should not throw.
The onInactive should cancel any existing Subscription if the Subscription representing that connection has been received:
@Override protected void onInactive() { super.onInactive(); synchronized (mLock) { WeakReference<Subscription> subscriptionRef = mSubscriptionRef; if (subscriptionRef != null) { Subscription subscription = subscriptionRef.get(); if (subscription != null) { subscription.cancel(); } mSubscriptionRef = null; } } }
Unfortunately, the call to onSubscribe is not guaranteed to happen on the same thread the Publisher.subscribe() is invoked. Even though RxJava's Flowable almost always does so, other Reactive-Streams implementations are free to delay the call to onSubscribe and issue it from any thread. (We deal with such situations in Flowable via the deferred requesting/cancellation mechanism.)
Consider the following scenario. We know onActive and onInactive execute on the main thread thus they are serialized in respect to each other. Let's assume a Publisher will signal onSubscribe in a delayed manner when subscribed to. The main thread executes onActive that calls subscribe() on the Publisher. Then the main thread executes onInactive() after. Since the Publisher hasn't called onSubscribe yet, there is no mSubscriptionRef and thus there is nothing to cancel. The onSubscribe() is then invoked by the Publisher and it starts streaming events (to nobody practically). Now the main thread executes onActive again, triggering yet another subscription to the Publisher. After onSubscribe() is called again, the previous Subscription is overwritten and there are now twice as many onNext events posted to the LiveData (these could be duplicates in case the Publisher is hot/multicasting, or arbitrary items from various stages of a cold Publisher). Since the previous Subscription is lost, it is not possible to stop the first subscription other than letting it terminate on its own pace.
One possible solution is to host the Subscriber in an AtomicReference inside the custom LiveData, plus have that Subscriber store the incoming Subscription in an AtomicReference too. Therefore, an onInactive can get rid of the previous Subscriber and at the same time, instruct it to cancel the Subscription incoming through onSubscribe without requesting from it:
static final class PublisherLiveData<T> extends LiveData<T> { final Publisher<T> mPublisher; final AtomicReference<SubscriberLiveData> mSubscriber; PublisherLiveData(@NonNull final Publisher<T> publisher) { mPublisher = publisher; mSubscriber = new AtomicReference<>(); } @Override protected void onActive() { super.onActive(); SubscriberLiveData s = new SubscriberLiveData(); mSubscriber.set(s); mPublisher.subscribe(s); } @Override protected void onInactive() { super.onInactive(); SubscriberLiveData s = mSubscriber.getAndSet(null); if (s != null) { s.cancel(); } } final class SubscriberLiveData extends AtomicReference<Subscription> implements Subscriber<T>, Subscription { @Override public void onSubscribe(Subscription s) { if (compareAndSet(null, s)) { s.request(Long.MAX_VALUE); } else { s.cancel(); } } @Override public void onNext(T item) { postValue(item); } @Override public void onError(Throwable ex) { lazySet(this); mSubscriber.compareAndSet(this, null); ex.printStackTrace(); } @Override public void onComplete() { lazySet(this); mSubscriber.compareAndSet(this, null); } @Override public void cancel() { Subscription s = getAndSet(this); if (s != null && s != this) { s.cancel(); } } @Override public void request(long n) { // never called } } }
Although onActive and onInactive would update mSubscriber from the main thread, the atomics is preferable since we want to set the current Subscriber to null once the Publisher terminated, thus playing nice with the GC. (For comparison, the refCount in RxJava is more involved because connection and disconnection may be triggered from any thread at any time.)
Conclusion
The LiveData class can be considered a classical (gen 0) reactive-push component which has practically one purpose on Android: notify its Observers of data (changes) on the main thread. If the typical thread crossing in the app is mainly from a background thread to the main thread, LiveData may be just enough to suit one's needs. It's one step above Agera after all since LiveData actually transmits data to be acted upon, not just "something happened" signals. However, if possibly failing and finite sources are present, which also have to be coordinated, transformed and kept on background thread(s) as long as possible, I believe RxJava is a better choice.
Nevertheless, there are standard interoperation bridges provided with LiveData which would allow it to work with or pose as a Reactive-Streams Publisher, but unfortunately, the implementation of this bridge has concurrency flaws that could be easily fixed by a PR, if there was an open repository for the library (because you'd want to post unit tests as well).
Hey David! Hope you are well.
VálaszTörlésYou don't have to show contempt when pointing out mistakes in someone else's code, you know? You could choose to be kind. Please keep that in mind.
I wish you all the best!