Ɗ.

RxJS

RxJS

RxJS is a reactive extention library for javascript.

I didnt knew about this piece of tool util recently I saw it implemented in the one of the project I am working with Networked. Fireship made a video of its tutorial on youtube 6 years ago and I didnt even knew this existed, it would have literary saved a ton of time handling browser events across the components!

In the words of rxjs

This of rxjs as the loadash for events

It allows handling of async events as collections. The essential concepts in RxJS which solve async event management are:


Quick start

To start create a html and javascript file and as per convention name it index.html and script.js then import the rxjs script from cdn with the link like below and our script.js in html.

<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

now we can access the rxjs in our script file so we can move ahead.

Note: I’m trying this out in pure vanillaJs but you can also try this in any javascript frontend library and framework and you will have easier time to implement if you are familiar.

Generally we add listener on a button click like

element.addEventListener('click', () => console.log('Clicked!'));

but with rxjs it looks something like

const { fromEvent } = rxjs;
const clicks = fromEvent(element, 'click');
clicks.subscribe((e) => console.log(e));

Now lets add the counter to button in conventional way it would look like this in vanilla js

let count = 0;
element.addEventListener('click', () => {
  count += 1;
  console.log(`Clicked ${count} times`);
});

now with rxjs we can write

const { fromEvent, scan } = rxjs;
fromEvent(document, 'click')
  .pipe(scan((count) => count + 1, 0))
  .subscribe((count) => console.log(`Clicked ${count} times`));

pipe is just a function that takes any number of rxjs functions in it and executes it accordingly. above code looks a bit complex if you dont know the reduce function of array. Think of scan as reduce what it does is it starts from 0 provided in second parameter in scan function and on each call it adds one to it.

Now you can go and start implementing below are just a examples for references.


Examples

Ways to create Observables

with create method
function observableFunction(observer) {
  // next emits the event with value
  observer.next('hello');
  observer.next('world');
}

const { Observable } = rxjs;
const observe = Observable.create(observableFunction);
// subscribe listens to the event
observe.subscribe((value) => print(value));

Generally you dont have to create observables like this manually this was just to show-case.

From promise
const proxyApiCall = new Promise((res, rej) => {
  setTimeout(() => res('ok'), 2000);
});

const { from } = rxjs;
const observe = from(proxyApiCall);
observe.subscribe((v) => print(v));

From is a used to create a observable from the promise based function.

Timer like timeout
const { timer } = rxjs;
const observe = timer(2000);
observe.subscribe(() => print('timer ran out'));

With the timer function the observable will fire on the completion of the timer.

Interval
const { interval } = rxjs;
const observe = interval(2000);
observe.subscribe(() => print('IT ran out'));

With the interval the observable will fire on the every defined interval.

of
const { of } = rxjs;
const observe = of('number', 20, ['dev', true], { name: 'devdutt' });
observe.subscribe((val) => print(val));

With of function we can create observable. This depicts that we can create it with any data type.


Hot vs Cold Observables

Cold are the observables which will generate the data from within. Observables dont generate value until something subscribes on it. When subscribers subscribe to such observables thinking each of them will get the same data but it might not depending on function of the observables.

// COLD
const { Observable } = rxjs;
const cold = Observable.create((obs) => {
  obs.next(Math.random());
});
// subscriber listens to the event
cold.subscribe((value) => print(`Subscriber 1: ${value}`)); // 0.98...
cold.subscribe((value) => print(`Subscriber 2: ${value}`)); // 0.87...

Hot are the observables which will send the same data to every subscriber on calling the connect function after making the every subscription to the observables.

// HOT
const { Observable } = rxjs;
const num = Math.random();
const hot = Observable.create((obs) => {
  obs.next(num);
});
// subscriber listens to the event
hot.subscribe((value) => print(`Subscriber 1: ${value}`)); // 0.98...
hot.subscribe((value) => print(`Subscriber 2: ${value}`)); // 0.98...

Above is a hot observable as every subscriber will get the same data.


Completion

const { finalize, timer } = rxjs;
timer(2000)
  .pipe(finalize(() => print('All done!')))
  .subscribe();

This observable prints All done! when some action of event emission of if completed. At the end of its lifecycle.

const { finalize, interval } = rxjs;
const observer = interval(1000).pipe(finalize(() => print('All done!')));
const listen = observer.subscribe();

// we are manually forcing unsubscribe so the
// the observer will be ended and finalize will get called
setTimeout(() => {
  listen.unsubscribe();
}, 4000);

Operators

map
const { of, pipe, map } = rxjs;
const observer = of(1, 2, 3, 4, 5).pipe(map((val) => 10 * val));
observer.subscribe(print); // 10, 20, 30, 40, 50

Map does exactly what is suggests it loops over the values of observable and returns the new value to the subscriptions.

tap
const { of, pipe, tap } = rxjs;
const observer = of(1, 2, 3, 4, 5).pipe(tap(print));
observer.subscribe(() => {});

Tap lets us run the side effect on the observables. Tap functions lets us Tap into the value of observable.

filter
const { of, pipe, filter } = rxjs;
const observer = of(1, 2, 3, 4, 5).pipe(filter((val) => val & 1));
observer.subscribe(print);

Filter filter out the values on the based of pre condition and subscriber only gets the filtered value.

first, last
const { of, pipe, first, last } = rxjs;
const observer = of(1, 2, 3, 4, 5).pipe(first());
const observer2 = of(1, 2, 3, 4, 5).pipe(last());
observer.subscribe(print); // 1
observer2.subscribe(print); // 5

first and last returns the first and last value form the data stream.

debounceTime(last event)
const { fromEvent, pipe, debounceTime } = rxjs;
const observer = fromEvent(document, 'click').pipe(debounceTime(500));
observer.subscribe((e) => print('clicked'));
throttleTime(first event)
const { fromEvent, pipe, throttleTime } = rxjs;
const observer = fromEvent(document, 'click').pipe(throttleTime(500));
observer.subscribe((e) => print('ok'));

I dont think I need to explain throttle and debouncing.

scan
const { fromEvent, pipe, scan } = rxjs;
const observer = fromEvent(document, 'click').pipe(
  scan((count) => count + 1, 0)
);
observer.subscribe((v) => print(`clicked ${v} times`));

scan is just like reduce function for array. With a difference that scan will accumulate the data stream from observable.

switchMap

switchMap is used to get the data from other observable first before sending the data to subscriber.

takeUntil

takeUntil is used to automatically unsubscribe when the expression is matched

takeWhile

takeWhile is used to automatically unsubscribe after the expression is failed to match.

zip, forkJoin, catch, retry

These are other useful operators provided by rxjs which are very useful but now I will not cover more!