Edgars Nemse

Introduction to Reactive Programming and RxJS

Technical
September 12, 2018

This is the first post in a series of articles helping you get up to speed with the Radix JavaScript library. The first few posts will focus on more general programming concepts required to understand the architecture of the library, after which I will dive into the different components of the library itself. By the end of this article you will be building dApps on Radix like a pro.

In this post we'll look at reactive programming and the RxJS library. There are Rx libraries in all major languages, including RxJava which our Java library utilizes heavily, so the concepts discussed in this post are applicable beyond just Javascript. I will give you an overview of all key components of RxJS and provide you with links to learn more. Let's dive in.

Why RxJS?

Modern programming is highly asynchronous. Think about a typical JavaScript-based web application - you're making network requests and receiving responses sometime later, you're opening websockets and streaming in new notifications, you're reacting to events triggered by the user interacting with the application.

Traditionally this has been handled with callback functions and events. However if you have ever dealt with complex JS applications, you know this can quickly get very messy and hard to reason about, because the code doesn't follow normal control-flow. Furthermore, there are certain patterns you find yourself re-implementing over and over again.

This is where RxJS comes in. It introduces a powerful paradigm for working with these kinds of asynchronous computations and makes them first-class citizens in your code. We've found the reactive pattern incredibly useful across the whole Radix stack.

Observables and Observers

At the core of RxJS are observables and observers. An observable represents a collection of any number of future values. You can think of it as a function that can return a stream of results indefinitely into the future. An observer is simply a set of 3 callback functions that handle the returned values.

This is best explained with a quick example. First, let's create an observable. This observable will return 1, 2 and 3 as soon as you subscribe to it, and 4 a second later asynchronously.

import { Observable } from 'rxjs'

const observable = Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Now we need to create an Observer which will invoke the Observable, receive the values and output them to console. Like I mentioned before, an observer is just an object consisting of 3 callback functions - they are next, complete and error. Next is invoked when a new value is returned, complete means the observable won't return any more values and error means something has gone wrong.

import { Observable } from 'rxjs'

const observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

If you were to run this code you would see the following output in the debug console

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Subjects

Observables are unicast, meaning every time you call subscribe on an observable you get a new and independent execution of that observable. But sometimes you want to allow multiple observers to listen to the same values, for example if you're broadcasting status updates for some system.

This is where Subjects come into play. From an Observer's point of view, they are indistinguishable from a regular Observable - you call subscribe on them in exactly the same way.

Interestingly, they also look just like Observer's from the outside - to push the next value, you call subject.next(value). Later you call subject.complete() finish the execution.

An example will demonstrate this better than I can ever explain:

import { Subject } from 'rxjs'

const subject = new Subject()
console.log('Adding subscriber 1')
subject.subscribe({
  next: x => console.log('subscriber 1 got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('subscriber 1 done'),
});

subject.next(1)
subject.next(2)
console.log('Adding subscriber 2')

subject.subscribe({
  next: x => console.log('subscriber 2 got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('subscriber 2 done'),
});

subject.next(3)
subject.complete()

Running this code will produce the following output

subscriber 1 got value 1
subscriber 1 got value 2
Adding a second subscriber
subscriber 1 got value 3
subscriber 2 got value 3
subscriber 1 done
subscriber 2 done

RxJS also provides a few useful variations of subjects, such as BehaviorSubject which stores the last value and immediately returns it to every new observer, useful if you're broadcasting the current state of a system, and a ReplaySubject which can store and replay multiple previous values.

Operators

On top of these basic abstractions, RxJS provides a large toolset for manipulating these streams called Operators. They allow you to work with streams of a data in a composable functional style. Operators do not modify the existing observable, instead they return a new and modified observable, which means you don't have to worry about accidentally messing up your observables.

Here is an example of how you might chain operators to modify an observable.

import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';

Const source = of([1, 2, 3, 4, 5])
source
  .pipe(filter((x) => { return x > 2})) // [3, 4, 5]
  .pipe(map((x) => {return x * 2})) // [6, 8, 10]

This example takes a stream of numbers, filters out any values less than 3 and then doubles each value.

There are many different operators available, and they broadly fall into creation, filtering, transformation and combination operators.


Creation operators allow you to create observables from different inputs, such as arrays of values or events and promises.

Filtering operators let you filter streams based on the values of the incoming items, but also based on things like time elapsed between events, for example the debounce operator.

Transformation operators include things like map which applies a transformation to every item going through, as well as ones that combine multiple values like groupBy and reduce

Combination operators let you take multiple observable streams and merge them into one, such as merge, concat and zip, but also more advanced ones like buffer, which collects the values of one observable and releases them when a trigger observable emits an event.

This site provides a great overview of all available RxJS operators:- https://www.learnrxjs.io/

Conclusion

I've only scratched the surface of reactive streams. It's a powerful paradigm that changes the way you think about asynchronous programming. If you want to read further, I can recommend this excellent in-depth tutorial by André Staltz:- [https://gist.github.com/staltz/868e7e9bc2a7b8c1f754]. You can also take a look at RxJS documentation here:- [http://reactivex.io/rxjs/manual/overview.html#introduction]

Next time I'll return with an introduction to Entity-Component-Systems - a design pattern from game programming for which we have found a surprising use in the Radix architecture.

Become a Radix Insider

subscribe

Related articles