Introduction
In this final blog post about Schedulers, I'll talk a bit about services which don't expose any ExecutorService facilities, such as when one needs to interact with GUI event loops.Working with non-Executor-based services
Some scheduling frameworks, such as Java's own AWT event loop doesn't offer Future-like scheduling capabilities, but only a single method to execute a task on them. Other frameworks may offer a pair of add/remove methods for tasks.
Let's assume we have the following API available to perform task-scheduling and cancellation on some GUI event loop:
public interface GuiEventLoop { void run(Runnable task); void cancel(Runnable task); } public static final class EDTEventLoop implements GuiEventLoop { @Override public void run(Runnable task) { SwingUtilities.invokeLater(task); } @Override public void cancel(Runnable task) { // not supported } }
Naturally, one can wrap convert the above invocation direclty into an Executor:
Executor exec = SwingUtilities::invokeLater;
and use the ExecutorScheduler from part 3, but it'd, usually, add unnecessary overhead and I'd also like to show how to deal with the case when a task-removal and cancellation is available through a specific method in the GUI framework.
Since GUI event loops are single-threaded, we don't need to worry about serialization and trampolinig in our Worker implementation and we can start out with a simpler skeleton for our GuiScheduler:
public final class GuiScheduler extends Scheduler { final GuiEventLoop eventLoop; public GuiScheduler(GuiEventLoop el) { this.eventLoop = el; } @Override public Worker createWorker() { return new GuiWorker(); } final class GuiWorker extends Worker { final CompositeSubscription tracking = new CompositeSubscription(); @Override public void unsubscribe() { tracking.unsubscribe(); } @Override public boolean isUnsubscribed() { return tracking.isUnsubscribed(); } @Override public Subscription schedule(Action0 action) { // implement } @Override public Subscription schedule( Action0 action, long delayTime, TimeUnit unit) { // implement } } }
There isn't anything special yet: we are going to delegate to the same GuiEventLoop instance and we track tasks scheduled by individual GuiWorker instances separately. Since we expect the GuiEventLoop to be single-threaded, there is no need to do queue-drain here and thus have the worker extend Runnable. Let's see the non-delayed schedule() implementation first:
@Override public Subscription schedule(Action0 action) { if (isUnsubscribed()) { // (1) return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); tracking.add(sa); sa.add(Subscriptions.create( () -> tracking.remove(sa))); // (2) Runnable r = () -> { // (3) if (!sa.isUnsubscribed()) { sa.run(); } }; eventLoop.run(r); // (4) sa.add(Subscriptions.create( () -> eventLoop.cancel(r))); // (5) return sa; }
It contains quite a few familiar steps:
- In case the worker has been unsubscribed, we just return an unsubscribed Subscription instance.
- We wrap the action with our ScheduledAction and hookup the tracking and removal logic.
- In this example, we care about that if the ScheduledAction is unsubscribed, we eagerly won't execute its body. Because the eventLoop API expects the same Runnable instance for cancellation and ScheduledAction.run() doesn't do an isUnsubscribed() check on itself, we need to wrap a small logic into a Runnable.
- We submit this wrapper runnable to the eventLoop API,
- then add a unsubscription action which will remove it as necessary. Note if we'd done this the other way around and the worker itself was unsubscribed just before (4), we'd immediately call cancel with an r that is not in the event loop and then schedule r regardless and have a larger retention window than in the current case.
@Override public Subscription schedule( Action0 action, long delayTime, TimeUnit unit) { if (delayTime <= 0) { // (1) return schedule(action); } if (isUnsubscribed()) { // (2) return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); tracking.add(sa); sa.add(Subscriptions.create( () -> tracking.remove(sa))); // (3) Future<?> f = CustomWorker.genericScheduler // (4) .schedule(() -> { Runnable r = () -> { if (!sa.isUnsubscribed()) { sa.run(); } }; eventLoop.run(r); sa.add(Subscriptions.create( () -> eventLoop.cancel(r))); // (5) }, delayTime, unit); sa.add(Subscriptions.create( () -> f.cancel(false))); return sa; }
Most of the time, the algorithm boils down to the same underlying structure with lots of similarities:
- We treat any non-positive delay as a regular schedule() call.
- We return if the worker itself was already unsubscribed.
- We perform the usual wrapping and hookups.
- We take our genericScheduler and schedule a delayed action that will then relay the real action to the eventLoop,
- which happens the same way as with the regular schedule() call: we wrap the action into a runnable that checks for its unsubscribed state, submit it to the event loop and add a cancel action.
Finally, let's use it:
Scheduler s = new GuiScheduler(new EDTEventLoop()); Observablesource = Observable.just(1) .delay(500, TimeUnit.MILLISECONDS, s) .doOnNext(v -> { try { Thread.sleep(1000); } catch (InterruptedException ex) { ex.printStackTrace(); } System.out.println(Thread.currentThread()); }); TestSubscriber ts1 = new TestSubscriber<>(); TestSubscriber ts2 = new TestSubscriber<>(); TestSubscriber ts3 = new TestSubscriber<>(); source.subscribe(ts1); source.subscribe(ts2); source.subscribe(ts3); ts1.awaitTerminalEvent(); ts1.assertNoErrors(); ts1.assertValue(1); ts2.awaitTerminalEvent(); ts2.assertNoErrors(); ts2.assertValue(1); ts3.awaitTerminalEvent(); ts3.assertNoErrors(); ts3.assertValue(1);
Which should print:
Thread[AWT-EventQueue-0,6,main]
Thread[AWT-EventQueue-0,6,main]
Thread[AWT-EventQueue-0,6,main]
Conclusion
In this final blog post about Schedulers, I've shown how one can wrap some event-loop based framework's API and convert it to RxJava's Scheduler API.Generally though, there are many subtleties possible with the Scheduler API or with APIs we'd like to wrap with it. Such 'individual' cases are hard to generalize upfront in blog posts, so if you have some interesting or difficult API to wrap, try your luck in the rx-java topic on StackOverflow; our RxJava google group or you can contact me more directly in the comments or on twitter (@akarnokd).
Reactive-streams seems to have become more widely known lately, but since it doesn't offer much beyond a set of interoperation interfaces (i.e., no flatMap etc.), many started to write their own one-time Publishers over it and got confused about how its Subscription model should behave. Since RxJava 2.0 will support reactive-streams API natively anyway, luckily, our knowledge about Producers will come just in handy when doing reactive-streams-like Subscriptions. In the next series of blog post, I'll talk about the reactive-streams API and how one can convert an RxJava Producer into such a Subscription.
Nincsenek megjegyzések:
Megjegyzés küldése