rxjava and a synchronous main thread

not too long ago, someone pointed me to this blog post about keeping your main thread synchronous. it referenced Ray Ryan’s excellent talk about the matter. the talk and blog post lead me to a set of investigations, which prompted me to write this blog post.

quick note: why keep the main thread synchronous

if you are curious as to the kinds of problems that can occur if the main thread isn’t synchronous (or why it’s not synchronous when you use observeOn), see this post about the main thread for a good explanation.

the summary is that everytime you use handler.post(), you post something to be run later, and you don’t have any guarantees as to when it will be run. a common case is when you schedule something to update the ui, but before it actually runs, an onDestroy comes in, causing the code to update the ui after the destruction of the activity.

in rx, specifically, observeOn(AndroidSchedulers.mainThread()) causes a handler.sendMessageDelayed (see LooperScheduler.java), which could cause code to run at a point after we are thought to have unsubscribed, thus causing issues.

basic rules of rxjava threading

in many of the talks about rxjava1, we find a set of repeated rules about rxjava and threading:

while these are all true, there are a few minor points that were not immediately obvious to me early on about the first three, so i would like to elaborate a bit on those.

rxjava is single threaded by default

as long as you do not use observeOn, subscribeOn, or an operator that runs on a particular scheduler (ex timer), the callback will be receieved on the thread subscribe happened on.

subscribeOn only affects upstream

one subtle point - consider:

Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.io())
    .subscribe(integer -> {
      Log.d(TAG, "got value on " + Thread.currentThread().getName());
    });

despite subscribeOn only affecting upstream, this will always print the result on an io thread, irrespective of the thread on which we called this code. this is because subscribeOn subscribes to the observable on the thread passed in, which means that onNext will be called on that particular thread.

only the subscribeOn closest to the source matters

consider this code:

Observable.just(1, 2, 3)
    .doOnSubscribe(() -> 
        Log.d(TAG, "subscribe to just on " +
            Thread.currentThread().getName()))
    .subscribeOn(Schedulers.io())
    .filter(integer -> integer % 2 == 0)
    .doOnSubscribe(() ->
        Log.d(TAG, "subscribe to filter on " +
            Thread.currentThread().getName()))
    .subscribeOn(Schedulers.computation())
    .subscribe(integer -> {
      Log.d(TAG, "got value on " + Thread.currentThread().getName());
    });

running this example results in:

D/are: subscribe to filter on RxComputationScheduler-1
D/are: subscribe to just on RxIoScheduler-2
D/are: got value on RxIoScheduler-2

if, however, we changed doOnSubscribe with doOnNext in the code block above, we’d instead get:

D/are: onNext from just with RxIoScheduler-2
D/are: onNext from just with RxIoScheduler-2
D/are: onNext from filter with RxIoScheduler-2
D/are: got value on RxIoScheduler-2
D/are: onNext from just with RxIoScheduler-2

the caveat here is that the subscribeOn closest to the source is the one that determines which thread onNext will get called on (but subscriptions still happen on the thread specified by subscribeOn).

the reason for this is that each subscribeOn subscribes to the upstream observable on that particular thread.

let’s take an example - given:

Observable.just(1,2,3)
          .subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .subscribe();

let’s break down what happens:

diagram

see the source for subscribeOn for more details.

running things in parallel with rxjava

in order to run things in parallel, we use flatMap or concatMap, with multiple observers that can then subscribeOn whatever scheduler they want to. the difference between concatMap and flatMap is that flatMap can emit items out of order, whereas concatMap will always emit items in order.

so what does this do? flatMap is essentially a merge, which “combines multiple Observables into one by merging their emissions”2. note that the observable contract stipulates that “Observables must issue notifications to observers serially (not in parallel).” this means that onNext will not be called concurrently, and part of merge’s job is to make sure that onNext is only called by one thread at a time.

for more on this, see Thomas Nield’s article about achieving parallelization, and also, see David Karnok’s article about FlatMap.

special thanks to Michael Evans for proofreading this.


  1. both of the aforementioned talks are definitely worth watching if you haven’t already seen them! ↩︎

  2. quote from rx merge docs ↩︎

comments powered by Disqus