Rx.js: The Big Picture
Reactive programming is a new approach to developing apps and is getting a lot of hype today. Learn how to level up your code to react to changes automatically and new ways to approach problems and process asynchronous data as streams.
Reactive programming may be an unfamiliar paradigm, but actually, it is nothing really new. Microsoft Excel was built on it long before it was popular.
In recent years, Reactive programming found its way into modern apps with things like state management in web frameworks and video game engine tools.
The focus of this article is the JavaScript implementation of ReactiveX (Rx.js), which is a library for composing asynchronous and event-based apps using Reactive programming.
But before we talk about Rx.js, we must learn a few things like Declarative, Functional, Reactive programming, Observer pattern, Observables, and how they all blend into one.
Imperative vs Declarative programming
The first step towards writing Reactive code is to think differently about writing code. You may have heard these programming terms before:
- Imperative programming
- Declarative programming
Imperative programming is based on detailed steps that need to be done in order to perform an operation. Declarative is based only on what needs to be done.
Let’s look at a couple of examples of the two. In all of these examples two snippets of code do the exact same thing, but in a different way:
# 1
Imperative approach
const isPositiveNumber = currentNumber > 0 ? true : false;
Declarative approach
const isPositiveNumber = currentNumber > 0;
# 2
Imperative approach
function calculateSum(a, b) {
const firstNumber = a;
const secondNumber = b;
const sum = firstNumber + secondNumber;
return sum;
}
Declarative approach
function calculateSum(a, b) {
return a + b;
}
# 3
Imperative approach
const evenNumbers = [];
const allNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
for(let i = 1; i <= allNumbers.length; i++) {
if (i % 2 === 0) {
evenNumbers.push(i);
}
}
// [ 2, 4, 6, 8, 10 ]
Declarative approach
const allNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const evenNumbers = allNumbers.filter(n => n % 2 === 0);
// [ 2, 4, 6, 8, 10 ]
It is clear that Declarative programming is not only shorter but also requires a different approach to the situation at hand. It’s direct on What needs to be done rather than How.
The Declarative programming in the ReactiveX world closely resembles JavaScript higher-order functions (or C# LINQ) where each operation is a method chained to the previous.
from([1, 2, 3, 4, 5])
.pipe(
map(x => x * 10)
)
.subscribe((data) => console.log(data))
// 10, 20, 30, 40, 50
ReactiveX Reactive programming is based on Observables which we’ll elaborate more later on.
The Control Flow
The Declarative syntax allows us to focus on things one at a time in the right order:
- Create
- Transform
- Execute
- Complete
There are only a few variables to remember as they appear in order (in the same line/pipeline). We can look up everything that needs to happen just by reading the code from top to bottom,
Producer() // Observable source
.pipe(
operation1(),
operation2(),
operation3(),
)
.subscribe()
as opposed to having the logic scattered up and down through the file (Imperative approach).
Here, the state is been transferred from one operation to another, meaning you can easily trace it back to the original form.
Modifications are also welcome.
You can easily transform, schedule, or filter values within the pipe just by adding operators:
interval(1000)
.pipe(
map(value => value * 1000), // 0, 1, 2, 3, 4, * 1000
filter(value => value < 3000), // return not greater than 3000
take(5) // execute exactly 5 times
)
.subscribe((data) => console.log(data)) // 0, 1000, 2000
Rx.js is not only Declarative but also Reactive. It reacts to changes. To explain what this means we’ll look into the Observer pattern next.
The Observer Pattern
In software design and engineering, the Observer pattern is a software design pattern in which an object, named the Subject, maintains a list of its dependents, called Observers, and notifies them automatically of any state changes, usually by calling one of their methods.
— Wikipedia
In simple terms, you have one object, function, or component that is producing events and a list of subscribers to those events that are invoked (automatically) whenever a new event arrives.
Example of an Observer pattern
Let’s try to think about how Youtube works. You as a YoutubeUser can search and watch videos, comment, and like, but the interesting part is that you can (1) subscribe to some Youtube channels and you’ll be (2) notified whenever a new video goes live.
class YoutubeUser {
private currentUser;
get username() {
return this.currentUser;
}
constructor(user: string) {
this.currentUser = user;
}
searchVideos() {}
watchVideos() {}
// (1)
subscribeToChannel(youtubeChannel: YoutubeChannel) {
youtubeChannel.addSubscriber(this.currentUser);
}
// (2)
getVideoNotification(videoName: string, youtuber: string) {
console.log(`[${this.currentUser} Notification]:
${youtuber} has uploaded a new video: ${videoName}`);
}
}
On the YoutubeChannel side of things, a channel has a list of
(1) subscribers, which is been (2) updated when a YoutubeUser subscribes to the channel.
Each channel subscriber (from the list) gets notified when a new video is
(3) uploaded.
class YoutubeChannel {
private youtuber!: YoutubeUser;
// (1)
channelSubscribers: YoutubeUser[] = [];
get channelName() {
return this.youtuber.username;
}
constructor(channelName: string) {
this.youtuber = new YoutubeUser(channelName);
}
// (2)
addSubscriber(channelName: string) {
this.channelSubscribers.push(new YoutubeUser(channelName))
}
// (3)
uploadVideo(videoName: string) {
this.channelSubscribers
.forEach((subscriber: YoutubeUser) => subscriber
.getVideoNotification(videoName, this.youtuber.username))
// each subscriber is an instance of YoutubeUser from
// channelSubscribers array
// each calls it's getVideoNotification() method that
// will print notification whenever uploadVideo() is called
}
}
Let’s create some Youtube Channels.
const ninjaTurtlesChannel = new YoutubeChannel('Master Splinter');
const dragonBallChannel = new YoutubeChannel('Master Roshi');
And let’s create users that will subscribe to these channels.
const youtubeUserLeo = new YoutubeUser('Leonardo')
const youtubeUserRaf = new YoutubeUser('Raphael')
const youtubeUserGoku = new YoutubeUser('Goku');
const youtubeUserKrillin = new YoutubeUser('Krillin');
// Youtube Users are added to the subscribers list of each channel
youtubeUserLeo.subscribeToChannel(ninjaTurtlesChannel);
youtubeUserRaf.subscribeToChannel(ninjaTurtlesChannel);
youtubeUserGoku.subscribeToChannel(dragonBallChannel);
And what happens now? Well, nothing. Now we wait for one of the channels we subscribed to upload a new video.
ninjaTurtlesChannel.uploadVideo('Ninjutsu lessons');
dragonBallChannel.uploadVideo('Teaching Kamehameha');
When a new video goes up, subscribers get immediately notified.
[LOG]: "[Leonardo Notification]:
Master Splinter has uploaded a new video: Ninjutsu lessons"
[LOG]: "[Raphael Notification]:
Master Splinter has uploaded a new video: Ninjutsu lessons"
[LOG]: "[Goku Notification]:
Master Roshi has uploaded a new video: Teaching Kamehameha"
If another video goes live from Master Splinter, his subscribers (Ninja Turtles) will be notified again.
ninjaTurtlesChannel.uploadVideo('Kung Fu lessons');
[LOG]: "[Leonardo Notification]:
Master Splinter has uploaded a new video: Kung Fu lessons"
[LOG]: "[Raphael Notification]:
Master Splinter has uploaded a new video: Kung Fu lessons"
What does any of this have to do with Reactive Programming?
It’s simple math.
Declarative Programming + Observer Pattern + Huge List of Operators = ReactiveX
In the Reactive programming context, the:
- YoutubeChannel is a Subject — that keeps a list of subscribers and produces values (videos)
- YoutubeUser is an Observer — that observes changes (new videos)
- Data passed between the two is wrapped in the Observable
ReactiveX is a library used for building asynchronous applications with Observable streams.
Its purpose is to bring the power of Reactive programming into your apps, by providing solutions for major programming languages:
- Rx.js (JavaScript)
- RxJava (Java)
- Rx.NET (C# .NET)
- RxPy (Python)
- RxCpp (C++), etc.
The ReactiveX uses the Observer pattern to track changes and abstracts that logic using Declarative principles. This combination of Declarative programming and the Observer pattern is what makes working with asynchronous data easy.
The Observables
At the heart of ReactiveX lie the Observables.
The data that is passed from Producer (Subject) to the Consumer/Receiver (Observer) is wrapped into an Observable. The Observable represents a sequence of events that occur in a never-ending interval.
That said you can unsubscribe from an Observable at any time.
To extract data from an Observable (or should I say convert Observable<T>
to plain <T>
), you put a call to subscribe()
function at the end. The subscribe()
function returns a callback function that holds raw data as its first parameter.
of('Hello World') // creates Observable<string>
.pipe(
// in between you can transform Observable<string>
)
// here you get the raw data
.subscribe((data: string) => {
console.log(data); // 'Hello World'
})
The Observable pattern is closely related to the Observer pattern. The key difference is that in the Observer pattern, observers register with a Subject to receive updates, while in the Observable pattern, observers subscribe to an Observable to receive emitted values or events.
Reactive Programming is programming with data streams called Observables.
Characteristics of Observables
In order to effectively work with Observables, we need to fully understand how they behave:
# Observables are Lazy
What this means is that without the subscribe()
call at the end, the Observable will never be activated, it will never create an API call or send data to consumers.
function getGreeting() {
return of('Hello World');
}
function sayGreeting() {
getGreeting()
// nothing will happen until you subscribe
}
# Synchronous and Asynchronous
The Observable can work both in a synchronous and asynchronous manner depending on the producer. An observable created from a primitive value (like a string) is no doubt synchronous, while an Observable produced by an API or a timer is asynchronous.
In addition to that, Observables can also be converted from one type to another using Schedulers.
# Observable can process a stream of data
# Observables can be invoked multiple times
Because observables work on callbacks.
# Observable can be unsubscribed from
This can be achieved in a number of ways, either using a subscription:
const subscription$ = of('Hello').subscribe();
subscription$.unsubscribe();
Or reactively after an Observable is called several times:
of('Hello')
.pipe(
take(1) // or takeUntil, takeWhile
)
.subscribe();
If we compare this to a JavaScript Promise, we can immediately see that Promises:
- Behave in an Eager manner
Activated when invoked. - Are only Asynchronous
- Can Process a single data at the time
- Can be called only once at a time
Promises can be in one of three states, pending, resolved, or rejected. - Cannot be canceled
Unless you’re using Abort Controller with Fetch API.
And you can see how much ground an Observable covers.
Working with Observables
Let’s give Observables a second look.
There are two types of Observables:
- Unicast (Cold)
- Multicast (Hot)
The difference between the two is how they share data with existing and late subscribers. In the former, the Observable is invoked whenever a new subscriber joins the club (subscribes to the Observable).
With Hot Observables early subscribers get the data, while late ones do not get anything until Observable emits again, in which case both early and late get the same (latest) data.
This of course can vary depending on the type of producer (Subject).
The Observables are Cold by nature but can be converted to Hot using Subjects or a set of operators (e.g. share()
).
The Observable can be in one of three states:
- Next (Active) state (
-a-a-a-a-
) - Error state (
-a-a-#
) - Completed state (
-a-a-|
)
While in the next state, the Observable can continuously emit data, hence why we referred to it previously as a never-ending stream.
However, in case of an unhandled error, the Observable will stop emitting data. There is also a completed state in which Observable is considered as successfully finished and can no longer emit data.
In web frameworks, it’s recommended to unsubscribe from any active observables in components that are no longer been used to avoid memory leaks.
Naming Conventions
The name of the variable that holds the Observable usually ends with a $ sign, e.g. data$
.
Create Transform Execute Complete
To create a new Observable we use the Observable instance that returns an Observer as a parameter of a callback function.
const data$ = new Observable((observer: Subscriber<string>) => {
observer.next('Hello'); // emits strings as specified above
if (somethingWentWrong) {
observer.error('Something went wrong');
}
observer.complete();
})
There are shorthand alternatives to create Observables like of()
or from()
.
In between the source Observable and the subscribe() function you can create a pipe function that holds operators that will transform, filter, destroy (unsubscribe), schedule, or combine Observables.
of([...])
.pipe(
// transformations
)
.subscribe()
The subscribe function also returns an Observer object that can track changes.
data$.subscribe({
next: (data: string) => console.log(data), // Success
error: (error: unknown) => console.log(error), // Fail
complete: () => console.log('Observable completed') // Completion
});
Like before there is a shorthand version for subscribe function.
// Success path
data$.subscribe((data: string) => console.log(data));
Synchronous vs Asynchronous Rx approach
In your typical JavaScript application, there is a distinct difference in how you handle synchronous or asynchronous data.
Rx.js however, uses the same API to do both. Looking at the code below it’s hard to tell if the code is asynchronous or not.
of('Hello')
.pipe(
take(1)
)
.subscribe();
This is synchronous, while the example below is asynchronous:
timer(1000)
.pipe(
take(1)
)
.subscribe();
Even events that are synchronous by nature (like a string text) can be converted to asynchronous using schedulers, delays, or by calling an API in the next step based on a string provided in the producer (source value).
of('Hello', AsapScheduler) // async operation
.pipe(
take(1)
)
.subscribe();
The bottom line, when working with Rx.js Observable streams there is no difference in processing data that is synchronous compared to one that is not.
Pros and Cons of ReactiveX
Now let’s look at the Pros and Cons and when you'd want to use Rx.js:
Benefits:
- Plethora of operators
With 100+ operators at your disposal, you shape the output however you like - Declarative programming
- Observables
- Wide support of languages and frameworks
Rx.js is heavily used in Angular, but it also can be integrated with other frameworks. - Better APIs
With Rx, you can cancel, retry, cache among consumers, delay one API until the existing one is going, handle errors, deal with race conditions, etc. - The same API for processing sync and async operations
- State management
Rx supports a variety of Subjects that provide an immutable state that can be shared among components. - Testing
Rx.js TestScheduler allows us to test asynchronous code as if it was synchronous, using Marble Diagrams, and simulate delays resulting in better performant tests.
Drawbacks:
- Hard to grasp
Reactive programming is hard to get used to at first because developers are not used to approaching things in this manner - Abundance of operators
This is also a negative thing because it will take a long time to learn these operators, concepts, and tools, what each does, and how to use them effectively
Setting up Rx.js in your project
There are a number of ways to include Rx.js in your existing project:
- Web app: Paste a CDN link into your project root file (index.html)
- Node.js backend app — Install Rx.js into your project using npm
npm i rxjs
- Web app with frameworks— Install Rx.js into your project using
npm i rxjs
oryarn add rxjs
- Angular & Nest.js — These frameworks support and have Rx.js installed out of the box
I recently published a blog explaining how to use NPM modules in web applications without frameworks and here I set up Rx.js into a vanilla JS/TS web app (without CDN). If you’re interested here is a full guide and a repository.
Wrapping Up
To summarize, Reactive programming describes a paradigm that relies on asynchronous programming logic to trigger changes in real-time.
The logic behind it lies in the Observer pattern. Producers of the events are called Subjects who sent data wrapped in an Observable to Observers that act upon it. The code itself is written in a Declarative manner where each step is an operator that can manipulate the data.
All in all, Reactive programming, Observables, and ReactiveX are vast topics. Hopefully, in this article, you were able to acknowledge the possibilities of this programming style.
Bye for now 👋
Read More
- Why You Should Consider Reactive Programming [Good Guy Daniel]
- What is Reactive Programming? [Tech Target]
- JavaScript Observables in 5 Minutes [Stack Chief]
- Reactive Programming in JavaScript with Rx.js [DZone]
- Rx.js in Action [Live Book]
- Rx,js Glossary [Rx.js Dev]