RxJava magic (finally) goes away
There is too much “magic” inside of RxJava.
Docs and tutorials are either too shallow or too complicated.
Here is the middle ground that is essential to know before playing with RxJava.
In these tests we will take each RxJava part… apart and explore how exactly it works.
Inspired by:
Basic usage pattern
Let’s start from a familiar example. We will take it apart later.
AtomicInteger received = new AtomicInteger();
Observable.just(1)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
received.set(integer);
}
});
assertEquals(1, received.get());
Implementing own Observable
and Subscriber
We will now implement our own just(1)
Observable
and take a brief look at Observable.subscribe
.
AtomicBoolean emitted = new AtomicBoolean();
// Creating Observable the usual way:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
emitted.set(true);
}
});
assertFalse(emitted.get());
// Nothing happens at this point! Observable does not emit values by itself,
// we need to subscribe first.
AtomicInteger received = new AtomicInteger();
// First, Observable.unsafeSubscribe is a simplified version of Observable.subscribe,
// so we will use it instead.
// Second, Observable.subscribe(Action1) is just a shortcut for
// Observable.subscribe(Subscriber), here we will use Subscriber directly.
observable
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
// We will rethrow the exception to not ignore asserts.
throw new RuntimeException(e);
}
@Override
public void onNext(Integer it) {
received.set(it);
}
});
assertTrue(emitted.get());
assertEquals(1, received.get());
// The value has been emitted and received after the subscription.
Observable.create
and Observable.unsafeSubscribe
We will now take apart Observable.create
and Observable.unsafeSubscribe
.
Observable.OnSubscribe<Integer> onSubscribe = new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
};
// This is how Observable.create(OnSubscribe<T> f) looks like (simplified),
// so we don't need to "implement" it:
// protected Observable(OnSubscribe < T > f) {
// this.onSubscribe = f;
// }
AtomicInteger received = new AtomicInteger();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer it) {
received.set(it);
}
};
// This is how Observable.unsafeSubscribe looks like (simplified):
try {
onSubscribe.call(subscriber);
}
catch (Throwable e) {
subscriber.onError(e);
}
assertEquals(1, received.get());
// onNext still works!
A little resume: OnSubscribe
function sends data directly to a given Subscriber
by
calling it’s methods.
Note: Observable
was not used in this test at all.
What Subscription
is
Subscription
is a simple interface with just two methods:
void unsubscribe();
boolean isUnsubscribed();
AtomicInteger received = new AtomicInteger();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer integer) {
received.set(integer);
}
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> targetSubscriber) {
assertEquals(subscriber, targetSubscriber);
// subscriber and targetSubscriber are the same object.
assertFalse(subscriber.isUnsubscribed());
// subscriber is subscribed, this is OK.
// All subscriptions are created in the subscribed state.
targetSubscriber.onNext(1);
targetSubscriber.onCompleted();
}
});
Subscription subscription = observable.unsafeSubscribe(subscriber);
assertEquals(subscription, subscriber);
// Surprise! Subscription and Subscriber are the same object.
// Subscriber implements Subscription interface.
assertFalse(subscriber.isUnsubscribed());
// ...and it is still subscribed even after the onCompleted() call... wait, WHY???
assertEquals(1, received.get());
// onNext still works!
Regular Observable.subscribe
and SafeSubscriber
The regular version of Observable.subscribe(...)
wraps Subscriber
into SafeSubscriber
and automatically enforces the lifecycle event order.
For example, SafeSubscriber
automatically calls unsubscribe()
after onComplete()
has been received.
AtomicInteger received = new AtomicInteger();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer integer) {
received.set(integer);
}
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> targetSubscriber) {
assertTrue(targetSubscriber instanceof SafeSubscriber);
// The regular subscribe() gave us a SafeSubscriber instance.
targetSubscriber.onNext(1);
targetSubscriber.onCompleted();
}
});
// This time we will use the regular subscribe():
Subscription subscription = observable.subscribe(subscriber);
assertNotEquals(subscription, subscriber);
// Subscription and Subscriber are NOT the same object now.
assertTrue(subscriber.isUnsubscribed());
// ...SafeSubscriber in action.
assertEquals(1, received.get());
// onNext still works! :)
Chaining operators
This is a simple exploration of chaining operators.
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// the received subscriber is some internal class inside of OperatorTake.
assertTrue(subscriber.getClass().getName().contains("OperatorTake"));
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
});
List<Integer> received = Collections.synchronizedList(new ArrayList<>());
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer it) {
received.add(it);
}
};
Subscription subscription = observable
.take(2)
.skip(1)
.mergeWith(Observable.just(3, 4))
.unsafeSubscribe(subscriber);
assertEquals(subscription, subscriber);
// subscription is still subscriber - it is just returned by unsafeSubscriber as-is!
assertArrayEquals(new Integer[]{2, 3, 4}, received.toArray());
// It works, surprisingly!
But… how does operator chaining work?
We will now look inside take()
function and see how one observable gets transformed into another after applying the operator.
Observable<Integer> observable = Observable.just(1, 2, 3);
OperatorTake<Integer> operatorTake = new OperatorTake<>(2);
List<Integer> received1 = Collections.synchronizedList(new ArrayList<>());
// Operator is a function that takes one subscriber and returns another:
// public interface Operator<R, T> extends Func1<Subscriber<R>, Subscriber<T>> {
observable
.lift(operatorTake) // this is what happens inside of .take(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
received1.add(integer);
}
});
assertArrayEquals(new Integer[]{1, 2}, received1.toArray());
// Works as expected.
List<Integer> received2 = Collections.synchronizedList(new ArrayList<>());
// We will now decompose the lift(Operator) function.
// It just returns a new Observable like this:
new Observable<Integer>(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> childSubscriber) {
try {
// This is where operator is called to create a new subscriber.
Subscriber<? super Integer> operatorSubscriber =
operatorTake.call(childSubscriber);
try {
operatorSubscriber.onStart();
observable.subscribe(operatorSubscriber);
}
catch (Throwable e) {
operatorSubscriber.onError(e);
}
}
catch (Throwable e) {
childSubscriber.onError(e);
}
}
}) {} // Observable`s constructor is protected, so we call
// it here by creating an anonymous class
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
received2.add(integer);
}
});
// The chain of observables is constructed first.
// When subscribe() is called, a chain of subscribers gets created from operators.
assertArrayEquals(new Integer[]{1, 2}, received2.toArray());
// Works!
Asynchronous subscription
Here we will see how Observable.subscribeOn
works.
AtomicReference<String> onSubscribeThreadName = new AtomicReference<>();
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
onSubscribeThreadName.set(Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}
});
AtomicInteger received = new AtomicInteger();
AtomicReference<String> subscriberThreadName = new AtomicReference<>();
Subscription subscription = observable
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
subscriberThreadName.set(Thread.currentThread().getName());
received.set(integer);
}
});
while (!subscription.isUnsubscribed()) {
Thread.sleep(1);
}
assertTrue(onSubscribeThreadName.get().contains("RxCachedThreadScheduler"));
// OnSubscribe is called on some internal Rx thread, as expected.
assertEquals(onSubscribeThreadName.get(), subscriberThreadName.get());
// But values has been received on the same thread where subscription happened!
// This is because we control only subscription with subscribeOn.
// If we want to receive values on a specific thread we need to use observeOn.
assertEquals("main", Thread.currentThread().getName());
// We're still safe here.
Observing values on a custom thread
This time we will use Observable.observerOn
operator to receive values on a given thread.
We will also create our own simple Scheduler
to understand how it works line.
AtomicReference<String> onSubscribeThreadName = new AtomicReference<>();
AtomicInteger received = new AtomicInteger();
AtomicReference<String> subscriberThreadName = new AtomicReference<>();
// We will add scheduled actions from the background thread here
// and run them on the current thread.
CopyOnWriteArrayList<Runnable> commandQueue = new CopyOnWriteArrayList<>();
Subscription subscription = Observable
.just(1)
.doOnSubscribe(() -> onSubscribeThreadName.set(Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(new Executor() { // Schedulers.from just wraps Executor.
@Override
public void execute(Runnable command) {
commandQueue.add(command);
}
}))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
subscriberThreadName.set(Thread.currentThread().getName());
received.set(integer);
}
});
while (!subscription.isUnsubscribed() || commandQueue.size() > 0) {
while (commandQueue.size() > 0) {
commandQueue.remove(0).run();
}
Thread.sleep(1);
}
assertTrue(onSubscribeThreadName.get().contains("RxCachedThreadScheduler"));
// OnSubscribe is still called on the given io() thread.
assertEquals("main", subscriberThreadName.get());
// But we observe values on the given observeOn thread (which is "main").
Backpressure
Backpressure is the ability of Observable
to not emit more values than it is requested by Subscriber
.
Let’s see how it works.
List<Integer> received = Collections.synchronizedList(new ArrayList<>());
Subscriber<Integer> subscriber = new Subscriber<Integer>() { // #1
@Override
public void onStart() {
super.onStart();
// Start from requesting one value.
request(1);
// request() is Subscriber's method that just calls Producer`s request().
// (more about Producer a few lines below)
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer integer) {
received.add(integer);
if (integer < 5) // We want to get only 5 items.
request(1); // #3
}
};
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> targetSubscriber) {
AtomicInteger lastSent = new AtomicInteger();
targetSubscriber.setProducer(new Producer() {
@Override
public void request(long n) {
targetSubscriber.onNext(lastSent.incrementAndGet()); // #2
}
});
}
})
.unsafeSubscribe(subscriber);
assertArrayEquals(new Integer[]{1, 2, 3, 4, 5}, received.toArray());
A careful reader could notice that Subscriber
and OnSubscribe
are calling each other recursively.
What will happen if we will change (integer < 5)
to (integer < 3000)
?
A-ha!
In the real life we’re not going to use backpressure in a single thread chain, so don’t be scared by this exception.
Asynchronous backpressure
Sending requests and values, back and forth, through different threads. Amazing!
List<Integer> received = Collections.synchronizedList(new ArrayList<>());
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onStart() {
super.onStart();
request(1);
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer integer) {
received.add(integer);
// We can use large backpressure numbers now because
// each scheduled action's stack starts from the scheduler's thread root.
if (integer < 5000)
request(1);
else
unsubscribe(); // We need to unsubscribe to exit background thread loop later.
}
};
Subscription subscription = Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> targetSubscriber) {
AtomicInteger lastSent = new AtomicInteger();
targetSubscriber.setProducer(new Producer() {
@Override
public void request(long n) {
targetSubscriber.onNext(lastSent.incrementAndGet());
}
});
}
})
.observeOn(Schedulers.io())
.subscribe(subscriber);
while (!subscription.isUnsubscribed()) {
Thread.sleep(1);
}
assertEquals(5000, received.size());
// We can now receive any amount of requested items.