Rx.js Core Features
While the future of Rx,js in Angular is uncertain, Rx.js will remain a powerful utility for Reactive Programming and one of the widely used libraries on NPM. If you’re not familiar with it, now is the perfect time to peek at what makes Rx.js so awesome.
Observables
At the heart of Rx.js lie the Observables. Observables are Reactive streams that represent values that change over time.
You can create an unlimited number of Observables in different ways for different purposes.
const dataStream$ = new Observable(observer => {
observer.next('A');
observer.next('B');
observer.next('C');
observer.complete();
});
// Alternatively (syntactic suugar)
const dataStream$ = of('A', 'B', 'C');
The Observables are lazy and they only output values upon Subscription.
dataStream$.subscribe(data => {
console.log(data); // A, B, C
})
What is it that makes Observables so powerful?
Let’s look at an example.
Imagine you’re creating a rectangle with sides a
and b
.
Since the rectangle has uneven sides, let’s say that b
side must always be twice as a
side and they can never be equal, otherwise, we’d have a square.
let a = 5;
let b = 2 * 5; // ?
If b
is twice as a
, then b
must be 10
.
console.log(b); // 10
However, what would happen if we’d reassign a
to a new value:
let a = 5;
let b = 2 * 5; // 10
a = 10;
What is the value of b
now?
Well in a regular programming paradigm b
will remain 10
, because b
was set to be twice as a
when a
was only five.
console.log(a); // 10
console.log(b); // 10
And now we have a square.
The only way to change it to a rectangle is to update the b
logic again. This is where Reactive Programming comes into play.
Instead of a
and b
being plain values, we’ll say that a is a Subject that will constantly be updated.
// declaration
const aSide$ = new Subject<number>();
// emitting new value (one at a time)
aSide$.next(1);
aSide$.next(2);
aSide$.next(3);
The b
side in an Observable that will always be twice as a
.
const bSide$ = aSide$.pipe(
map((currentAValue: number) => currentAValue * 2)
)
To print the value of the b
side (Observable), we need to subscribe to it:
bSide$.subscribe(currentBValue => {
console.log('currentBValue :>> ', currentBValue);
})
When you put everything together
const aSide$ = new Subject<number>();
const bSide$ = aSide$.pipe(
map((currentAValue: number) => currentAValue * 2)
)
bSide$.subscribe(currentBValue => {
console.log('currentBValue :>> ', currentBValue);
})
aSide$.next(1); // update a side
aSide$.next(2); // update a side
aSide$.next(3); // update a side
aSide$.next(4); // update a side
aSide$.next(5); // update a side
, you should see that b is indeed changing accordingly:
If you’d like to learn more about the Reactive Programming paradigm, I recommend reading my introduction article:
Operators
The operators are utility functions that let you manipulate the Observable stream. These include:
- Mapping an existing stream into a different form (
map
,pluck
) - Filtering stream values based on a condition (
filter
,skip
,skipWhile
) - Delaying stream (
delay
,debounce
,debounceTime
,throttle
,timeout
) - Limiting the stream flow (
take
,takeWhile
,takeUntil
) - Repeating the execution (
repeat
) or retrying in case of failure (retry
,retryWhen
) - Handling errors (
catchError
) - Mapping one stream into another (
concatMap
,mergeMap
,switchMap
) - Combining streams (
combineLatest
,forkJoin
,zip
) - Producing side-effects during (
tap
) or after the stream has been completed (finalize
) and so on.
The behavior of each operator is often visualized using the Marble Diagrams:
Pipes
The operators are wrapped by the pipes. A pipe is a function between the source (Subject) and the output (Subscriber) that holds a collection of operators.
Example of operators grouped into a pipe:
observable$
.pipe(
operator1,
operator2,
…
)
.subscribe()
An Observable can be very simple
of(1, 2, 3, 4, 5)
.pipe(
map(x => x * 10) // 10, 20, 30, 40, 50
filter(x => x > 30) // 40, 50
)
.subscribe(console.log) // 40, 50
Or as complex as you’d like.
Schedulers
You can delay the flow of any Observable by using the right operator, such as interval
, delay
, debounceTime
and similar:
of(1, 2, 3, 4, 5)
.pipe( )
.subscribe(console.log) // executes console logs immediately
of(1, 2, 3, 4, 5)
.pipe(
delay(1000) // hold for 1 second
)
.subscribe(console.log) // prints console log 5 times
The beauty of the scheduler is that we can change the timing of the control flow without adding additional operators to the flow.
console.log('A');
of('B', asyncScheduler)
.pipe( )
.subscribe(console.log)
console.log('C');
// the order: 'A', 'C', 'B'
Schedulers are special functions that provide a way to control the concurrency and timing of Observable flow. Rx.js comes with several schedulers:
- Async Scheduler
This scheduler executes asynchronously on the Event Loop macro task. - Asap Scheduler
The scheduler executes asynchronously on the Event Loop microtask. - QueueScheduler
Maintains a queue of tasks and executes operations immediately (synchronously). - AnimationFrameScheduler
This scheduler executes tasks on the next available animation frame usingrequestAnimationFrame
. - VirtualTimeScheduler (SuperClass of Test Scheduler)
The scheduler used for testing Observables.
Comparing schedulers
console.log('A');
const sourceB$ = of('B');
sourceB$
.pipe(observeOn(asapScheduler))
.subscribe((value) => console.log(value));
const sourceC$ = of('C');
sourceC$
.pipe(observeOn(asyncScheduler))
.subscribe((value) => console.log(value));
const sourceE$ = of('E');
sourceE$
.pipe(observeOn(queueScheduler))
.subscribe((value) => console.log(value));
console.log('D');
// output: A, E, D, B, C
To learn more about schedulers in Rx.js, I recommend reading the following blogs:
Subjects
The Subjects are a special type of Observables that serve as both the producers (Observers) and the consumers (Observables).
They emit values as well as be subscribed to:
const subject$ = new Subject<string>();
subject$.subscribe(data => {
console.log('data :>> ', data);
// data :>> 'Hello World!'
});
subject$.next('Hello World!');
The Subjects are often used for State management.
Three key characteristics of Subjects:
- Producers
- Multicasting
- Hot Observables
Producers
As seen above in the rectangle example, the Subjects can produce new values:
const observable$ = of(1, 2, 3); // fixed values
const subject$ = new Subject();
subject$.next(1);
subject$.next(2);
subject$.next(3);
// can be added indefinitely
Multicasting
The Observables are designed to be unicast, meaning the new Observable is created for each new subscriber. This means if an Observable is calling an API, each subscriber to it will invoke the API for it’s purposes:
const postsData$ = this.service.getAllPosts();
const firstSubscriber$ = postsData$;
const secondSubscriber$ = postsData$;
firstSubscriber$.subscribe(console.log);
secondSubscriber$.subscribe(console.log);
In the case of Subjects, the data emitted is shared among the subscribers without the need to invoke the source twice:
const subject$ = new Subject<string>();
const subscriber1$ = subject$;
const subscriber2$ = subject$;
subscriber1$.subscribe(console.log)
subscriber2$.subscribe(console.log)
subject$.next('Hello World!');
Hot Observables
The second important characteristic is that Subjects can emit values regardless of if there are any subscribers.
const author$ = new Subject<string>();
author$.next('My first Blog!');
const lateSubscriber$ = author$;
lateSubscriber$.subscribe(console.log)
author$.next('My second Blog!');
Because the lateSubscriber$
subscribed to the source (author$
) after the first blog was already published, it won’t be aware of the first emission (‘My First Blog’
). The lateSubscriber$
will only be notified on every subsequent publish by the source (author$
) starting from the moment of subscription.
There are four types of Subjects in Rx.js:
- Subject (default)
- Behavior Subject
- Replay Subject
- Async Subject
The most commonly used subject for state management is the Behavior Subject as it holds the initial value and the current state.
const subject$ = new BehaviorSubject<string>('Hello');
const currentState = subject$.getValue();
console.log('currentState :>> ', currentState); // 'Hello'
The Behavior Subject updates the existing state for each emission. It’s commonly used in Angular to distribute data among completely separated components.
With Behavior Subject, each subscriber starts with the initial value and then receives the latest value after calling the next method (emission).
const subject$ = new BehaviorSubject('A');
const lateSubscriber$ = subject$;
lateSubscriber$.subscribe(console.log)
subject$.next('B');
// Logs: A, B
Marble Diagrams
Testing Observables in Rx.js is done using the Marble Diagrams. The idea here is to test asynchronous code in synchronous manner.
As stated before each operator can be visualized in a Marble diagram:
This process can be represented in code using the ASCII characters:
const source$ = '--1--2--3--|';
const expected$ = '--10--20--30--|';
Each symbol has a meaning:
- Time frame (1ms):
-
- Emissions
1, 2, 3
- Observable completion
|
- Error (not present in the diagram)
#
So if there is an Observable that emits a new value every second and emits up to five values:
const source$ = interval(1000);
source$.pipe(
take(5)
)
.subscribe(console.log)
// 0, 1, 2, 3, 4 (one second delay between each)
You can visualize it like this:
const source$: '0----1----2----3----4|;
Also, you can find the diagram for the most commonly used operators on the site below:
Wrapping up
This was just a small slice of everything that Rx.js has to offer. If you’d like to learn more, be sure to check the official documentation. Also, follow me on my social channels to stay up to date with my new content.
Bye for now 👋