Rx.js Core Features

Mirza Leka
8 min readApr 30, 2024

--

While the future of Rx,js in Angular is uncertain, Rx.js will remain a powerful utility for Reactive Programming and one of the widely used libraries on NPM. If you’re not familiar with it, now is the perfect time to peek at what makes Rx.js so awesome.

Generated using Midjourney AI

Observables

At the heart of Rx.js lie the Observables. Observables are Reactive streams that represent values that change over time.

You can create an unlimited number of Observables in different ways for different purposes.

const dataStream$ = new Observable(observer => {
observer.next('A');
observer.next('B');
observer.next('C');
observer.complete();
});


// Alternatively (syntactic suugar)
const dataStream$ = of('A', 'B', 'C');

The Observables are lazy and they only output values upon Subscription.

dataStream$.subscribe(data => {
console.log(data); // A, B, C
})

What is it that makes Observables so powerful?

Let’s look at an example.
Imagine you’re creating a rectangle with sides a and b.

Since the rectangle has uneven sides, let’s say that b side must always be twice as a side and they can never be equal, otherwise, we’d have a square.

let a = 5;
let b = 2 * 5; // ?

If b is twice as a, then b must be 10.

console.log(b); // 10

However, what would happen if we’d reassign a to a new value:

let a = 5;
let b = 2 * 5; // 10

a = 10;

What is the value of b now?
Well in a regular programming paradigm b will remain 10, because b was set to be twice as a when a was only five.

console.log(a); // 10
console.log(b); // 10

And now we have a square.
The only way to change it to a rectangle is to update the b logic again. This is where Reactive Programming comes into play.

Instead of a and b being plain values, we’ll say that a is a Subject that will constantly be updated.

// declaration
const aSide$ = new Subject<number>();

// emitting new value (one at a time)
aSide$.next(1);
aSide$.next(2);
aSide$.next(3);

The b side in an Observable that will always be twice as a .

const bSide$ = aSide$.pipe(
map((currentAValue: number) => currentAValue * 2)
)

To print the value of the b side (Observable), we need to subscribe to it:

bSide$.subscribe(currentBValue => {
console.log('currentBValue :>> ', currentBValue);
})

When you put everything together

const aSide$ = new Subject<number>();
const bSide$ = aSide$.pipe(
map((currentAValue: number) => currentAValue * 2)
)

bSide$.subscribe(currentBValue => {
console.log('currentBValue :>> ', currentBValue);
})

aSide$.next(1); // update a side
aSide$.next(2); // update a side
aSide$.next(3); // update a side
aSide$.next(4); // update a side
aSide$.next(5); // update a side

, you should see that b is indeed changing accordingly:

If you’d like to learn more about the Reactive Programming paradigm, I recommend reading my introduction article:

Rainbow Six Siege Operators Wallpaper

Operators

The operators are utility functions that let you manipulate the Observable stream. These include:

  • Mapping an existing stream into a different form (map, pluck)
  • Filtering stream values based on a condition (filter, skip, skipWhile)
  • Delaying stream (delay, debounce, debounceTime, throttle, timeout)
  • Limiting the stream flow (take, takeWhile, takeUntil)
  • Repeating the execution (repeat) or retrying in case of failure (retry, retryWhen)
  • Handling errors (catchError)
  • Mapping one stream into another (concatMap, mergeMap, switchMap)
  • Combining streams (combineLatest, forkJoin, zip)
  • Producing side-effects during (tap) or after the stream has been completed (finalize) and so on.

The behavior of each operator is often visualized using the Marble Diagrams:

Example of a Marble diagram

Pipes

The operators are wrapped by the pipes. A pipe is a function between the source (Subject) and the output (Subscriber) that holds a collection of operators.

Visual representation Subject, Pipe & Subscriber

Example of operators grouped into a pipe:

observable$
.pipe(
operator1,
operator2,

)
.subscribe()

An Observable can be very simple

of(1, 2, 3, 4, 5)
.pipe(
map(x => x * 10) // 10, 20, 30, 40, 50
filter(x => x > 30) // 40, 50
)
.subscribe(console.log) // 40, 50

Or as complex as you’d like.

Generated using Midjourney AI

Schedulers

You can delay the flow of any Observable by using the right operator, such as interval, delay, debounceTime and similar:

of(1, 2, 3, 4, 5)
.pipe( )
.subscribe(console.log) // executes console logs immediately

of(1, 2, 3, 4, 5)
.pipe(
delay(1000) // hold for 1 second
)
.subscribe(console.log) // prints console log 5 times

The beauty of the scheduler is that we can change the timing of the control flow without adding additional operators to the flow.

console.log('A');

of('B', asyncScheduler)
.pipe( )
.subscribe(console.log)

console.log('C');

// the order: 'A', 'C', 'B'

Schedulers are special functions that provide a way to control the concurrency and timing of Observable flow. Rx.js comes with several schedulers:

  • Async Scheduler
    This scheduler executes asynchronously on the Event Loop macro task.
  • Asap Scheduler
    The scheduler executes asynchronously on the Event Loop microtask.
  • QueueScheduler
    Maintains a queue of tasks and executes operations immediately (synchronously).
  • AnimationFrameScheduler
    This scheduler executes tasks on the next available animation frame using requestAnimationFrame.
  • VirtualTimeScheduler (SuperClass of Test Scheduler)
    The scheduler used for testing Observables.

Comparing schedulers

console.log('A');

const sourceB$ = of('B');

sourceB$
.pipe(observeOn(asapScheduler))
.subscribe((value) => console.log(value));

const sourceC$ = of('C');

sourceC$
.pipe(observeOn(asyncScheduler))
.subscribe((value) => console.log(value));

const sourceE$ = of('E');

sourceE$
.pipe(observeOn(queueScheduler))
.subscribe((value) => console.log(value));

console.log('D');

// output: A, E, D, B, C

To learn more about schedulers in Rx.js, I recommend reading the following blogs:

https://jstobigdata.com/rxjs/rxjs-scheduler/

Generated using Midjourney AI

Subjects

The Subjects are a special type of Observables that serve as both the producers (Observers) and the consumers (Observables).
They emit values as well as be subscribed to:

const subject$ = new Subject<string>();

subject$.subscribe(data => {
console.log('data :>> ', data);
// data :>> 'Hello World!'
});

subject$.next('Hello World!');

The Subjects are often used for State management.
Three key characteristics of Subjects:

  • Producers
  • Multicasting
  • Hot Observables

Producers

As seen above in the rectangle example, the Subjects can produce new values:

    const observable$ = of(1, 2, 3); // fixed values

const subject$ = new Subject();
subject$.next(1);
subject$.next(2);
subject$.next(3);
// can be added indefinitely

Multicasting

The Observables are designed to be unicast, meaning the new Observable is created for each new subscriber. This means if an Observable is calling an API, each subscriber to it will invoke the API for it’s purposes:

    const postsData$ = this.service.getAllPosts();

const firstSubscriber$ = postsData$;
const secondSubscriber$ = postsData$;

firstSubscriber$.subscribe(console.log);
secondSubscriber$.subscribe(console.log);

In the case of Subjects, the data emitted is shared among the subscribers without the need to invoke the source twice:

    const subject$ = new Subject<string>();

const subscriber1$ = subject$;
const subscriber2$ = subject$;

subscriber1$.subscribe(console.log)
subscriber2$.subscribe(console.log)

subject$.next('Hello World!');

Hot Observables

The second important characteristic is that Subjects can emit values regardless of if there are any subscribers.

    const author$ = new Subject<string>();

author$.next('My first Blog!');

const lateSubscriber$ = author$;
lateSubscriber$.subscribe(console.log)

author$.next('My second Blog!');

Because the lateSubscriber$ subscribed to the source (author$) after the first blog was already published, it won’t be aware of the first emission (‘My First Blog’). The lateSubscriber$will only be notified on every subsequent publish by the source (author$) starting from the moment of subscription.

There are four types of Subjects in Rx.js:

  • Subject (default)
  • Behavior Subject
  • Replay Subject
  • Async Subject

The most commonly used subject for state management is the Behavior Subject as it holds the initial value and the current state.

const subject$ = new BehaviorSubject<string>('Hello');

const currentState = subject$.getValue();

console.log('currentState :>> ', currentState); // 'Hello'

The Behavior Subject updates the existing state for each emission. It’s commonly used in Angular to distribute data among completely separated components.

With Behavior Subject, each subscriber starts with the initial value and then receives the latest value after calling the next method (emission).

const subject$ = new BehaviorSubject('A');

const lateSubscriber$ = subject$;
lateSubscriber$.subscribe(console.log)

subject$.next('B');
// Logs: A, B
Generated using Midjourney AI

Marble Diagrams

Testing Observables in Rx.js is done using the Marble Diagrams. The idea here is to test asynchronous code in synchronous manner.

As stated before each operator can be visualized in a Marble diagram:

This process can be represented in code using the ASCII characters:

const source$ = '--1--2--3--|';
const expected$ = '--10--20--30--|';

Each symbol has a meaning:

  • Time frame (1ms): -
  • Emissions 1, 2, 3
  • Observable completion |
  • Error (not present in the diagram) #

So if there is an Observable that emits a new value every second and emits up to five values:

const source$ = interval(1000);

source$.pipe(
take(5)
)
.subscribe(console.log)
// 0, 1, 2, 3, 4 (one second delay between each)

You can visualize it like this:

const source$: '0----1----2----3----4|;

Also, you can find the diagram for the most commonly used operators on the site below:

--

--

Mirza Leka
Mirza Leka

Written by Mirza Leka

Web Developer. DevOps Enthusiast. I share my experience with the rest of the world. Follow me on https://twitter.com/mirzaleka for news & updates #FreePalestine

Responses (2)