3 things i learned about rxjava 2 subjects

intro

sometime in late december or early january, i decided to write a blog post per month. since it’s january 31st, i figured i should write about something to avoid dropping the ball so early in the year.

i introduced one of my friends to rxjava 2 not too long ago - his initial reaction was, “what? why would i want this?” - a few days later, it turned to, “hey, this is pretty cool!” - a few days after that, i learned several things from him as he ran into issues while migrating parts of his code.

consequently, i wanted to share these things, as they weren’t perfectly obvious to me (though in retrospect, they perhaps should have been).

also, i do realize that many consider subjects as “the mutable state of the reactive world” that should be avoided, but sometimes, they are a pretty good tool to use, especially when an entire code base is not yet reactive.

retry doesn’t actually retry

consider this code:

  @Test
  public void testRetry() {
    TestObserver<Integer> ts = new TestObserver<>();

    final boolean[] val = new boolean[] { true };
    Observable.just(1, 2, 3)
        .map(i -> {
          // fail only the first time that i = 2
          if (i == 2 && val[0]) {
            val[0] = false;
            throw new IllegalArgumentException("throw!");
          }
          return i;
        })
        .retry()
        .subscribe(ts);

    ts.awaitTerminalEvent();
    assertThat(ts.values()).containsAllOf(1, 2, 3);
  }

this test passes, and the values emitted are [1, 1, 2, 3]. i’ll come back to the repetition in a tiny bit. now consider if we make a small change to this code, so that it uses a subject instead:

  @Test
  public void testRetrySubject() {
    TestObserver<Integer> ts = new TestObserver<>();
    Subject<Integer> subject = PublishSubject.create();

    final boolean[] val = new boolean[] { true };
    subject
        .map(i -> {
          if (i == 2 && val[0]) {
            val[0] = false;
            throw new IllegalArgumentException("throw!");
          }
          return i;
        })
        .retry()
        .subscribe(ts);

    subject.onNext(1);
    subject.onNext(2);
    subject.onNext(3);
    subject.onComplete();

    ts.awaitTerminalEvent();
    assertThat(ts.values()).containsAllOf(1, 2, 3);
  }

this test now fails, outputting only [1, 3]. so, why is this the case? the reason is that retry doesn’t actually retry anything - it just resubscribes to the source observable.

so if we think of the flow in this case, we call onNext with 1, which is observed by the subscriber. we then call it with 2, which fails because we throw an exception, and causes us to resubscribe. resubscribing to it doesn’t cause anything to emit. when 3 is passed in, we then observe it.

we can prove this by replacing the PublishSubject with a BehaviorSubject - doing so will result in [1, 2, 3] (because the subject caches the last onNext value it received, which was 2, so it gets replayed upon resubscribing).

note that the fact that retry resubscribes to the source observable is also why data can be repeated (as seen in the first example without subjects) - so when 2 fails the first time around, we re-subscribe, and thus get an onNext of 1, 2, and then 3, thus resulting in the repetition of the 1.

calling subscribeOn on a subject has no practical effect

this one was strange to me at first, but made sense once i realized that the reasoning for this was the same as that of why only the subscribeOn closest to the source matters - in summary, it’s because once onNext is called, downstream will receive the value on the same thread onNext was called on. thus, if you have subject.subscribeOn(Schedulers.io()), but call subject.onNext(value) from the main thread, the downstream will receive onNext on the main thread.

see also what i wrote here in the section about “only the subscribeOn closest to the source matters” (while that article was about rx1, it’s still relevant in rx2).

observeOn’s buffer and rebatchRequests

this comes from my friend’s question on stackoverflow. suppose we have a case where ui events trigger some work, but we only want to do that work if it’s not currently already being done - an example of this is a button that does work - and let’s suppose this takes a good amount of time, during which someone can click the button a few more times. if the work is being done, we don’t want to restart it, but if no work is being done, we can start doing the work.

my friend realized, “aha, this sounds like something that backpressure can solve, let me use BackpressureStrategy.LATEST!” - and so he implemented his solution to look something like this:

  @Test
  public void testWork() {
    TestSubscriber<Integer> ts = new TestSubscriber<>();
    Subject<Integer> subject = PublishSubject.create();

    subject
        .toFlowable(BackpressureStrategy.LATEST)
        .observeOn(Schedulers.io())
        .map(ignored -> {
          System.err.println("before work");
          return ignored;
        })
        .observeOn(Schedulers.computation())
        .map(ignored -> {
          System.err.println("work");
          Thread.sleep(2000);
          return ignored;
        })
        .subscribe(ts);

    for (int i = 0; i < 32; i++) {
      subject.onNext(i);
    }
    subject.onComplete();

    ts.awaitTerminalEvent();
    assertThat(ts.valueCount()).isLessThan(32);
  }

this ended up failing, running the work 32 times - once for each and every emission of the subject. why?

as i learned from David’s answer, this is because observeOn has a buffer. since BackpressureStrategy.LATEST only keeps the latest value “if the downstream can’t keep up,” and since the default buffer size is 128 (unless it is overwritten by a system preference, in which case it must at least be 16), all the onNexts will be placed in a buffer until they can be sent downstream. in other words, backpressure doesn’t take effect here.

one solution i came up with based on this was replacing the first observeOn with observeOn(Schedulers.io(), false, 1) - this observeOn is called by the standard observeOn, with false for delayError, and bufferSize() for the buffer size. doing this results in the work only being done twice instead of 32 times. David said this would work, but would result in “time snapshots” as opposed to the latest event being processed (because as the worker was being processed, item 2 would be in observeOn’s queue, and would be sent downstream after the worker finishes - anything after 2 would be dropped until 2 is sent downstream).

David’s solution that actually gives you the latest was interesting - first, he used delay with 0ms as a means of switching threads without a buffer (i.e. .delay(0, TimeUnit.MILLISECONDS, Schedulers.computation())). then, he calls .rebatchRequests(1).

rebatchRequests was added as an experimental operator in RxJava 1, and this was the first time i had seen it. from my understanding, this is like a valve of sorts - it requests n items from upstream (based on the parameter passed in to it) - once 75% of them have been emitted downstream, it will request another n items from upstream.

it’s easier to understand what this is really doing when we look at how it’s implemented -

    public final Flowable<T> rebatchRequests(int n) {
        return observeOn(ImmediateThinScheduler.INSTANCE, true, n);
    }

hey, cool! it’s calling observeOn with a buffer size of n, which, in this case, is 1. ImmediateThinScheduler is a scheduler that runs things immediately on the current thread.

comments powered by Disqus