Reactive Programming with Rx.js

Reactive programming is a new approach to developing apps and is getting a lot of hype today. Learn how to level up your code to react to changes automatically and new ways to approach problems and process asynchronous data as streams.

Reactive programming may be an unfamiliar paradigm, but actually, it is nothing really new. Microsoft Excel was built on it long before it was popular.

In recent years, Reactive programming found its way into modern apps with things like state management in web frameworks and video game engine tools.

The focus of this article is the JavaScript implementation of ReactiveX (Rx.js), which is a library for composing asynchronous and event-based apps using Reactive programming.

But before we talk about Rx.js, we must learn a few things like Declarative, Functional, Reactive programming, Observer pattern, Observables, and how they all blend into one.




Imperative vs Declarative programming

The first step towards writing Reactive code is to think differently about writing code. You may have heard these programming terms before:

  • Imperative programming
  • Declarative programming

Imperative programming is based on detailed steps that need to be done in order to perform an operation. Declarative is based only on what needs to be done.

Let's look at a couple of examples of the two. In all of these examples two snippets of code do the exact same thing, but in a different way:

# 1

Imperative approach

const isPositiveNumber = currentNumber > 0 ? true : false;
                    

Declarative approach

const isPositiveNumber = currentNumber > 0;
                    


# 2

Imperative approach

function calculateSum(a, b) {
  const firstNumber = a;
  const secondNumber = b;
  const sum = firstNumber + secondNumber;
  return sum;
}                  

Declarative approach

function calculateSum(a, b) {
  return a + b;
}              


# 3

Imperative approach

const evenNumbers = [];
const allNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

for(let i = 1; i <= allNumbers.length; i++) {
  if (i % 2 === 0) {
    evenNumbers.push(i);
  }
}

// [ 2, 4, 6, 8, 10 ]                    
                    

Declarative approach

const allNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const evenNumbers = allNumbers.filter(n => n % 2 === 0);

// [ 2, 4, 6, 8, 10 ]                     
                    


It is clear that Declarative programming is not only shorter but also requires a different approach to the situation at hand. It's direct on What needs to be done rather than How.

The Declarative programming in the ReactiveX world closely resembles JavaScript higher-order functions (or C# LINQ) where each operation is a method chained to the previous.


from([1, 2, 3, 4, 5])
.pipe(
  map(x => x * 10)
  )
.subscribe((data) => console.log(data))
// 10, 20, 30, 40, 50                
                    

ReactiveX Reactive programming is based on Observables, on which we'll elaborate further on.




Photo by Pixabay from Pexels

The Control Flow

The Declarative syntax allows us to focus on things one at a time in the right order:

  • Create
  • Transform
  • Execute
  • Complete

There are only a few variables to remember as they appear in order (in the same line/pipeline). We can look up everything that needs to happen just by reading the code from top to bottom,

Producer() // Observable source
.pipe(
  operation1(),
  operation2(),
  operation3(),
)
.subscribe()                
                    

as opposed to having the logic scattered up and down through the file (Imperative approach).

Here, the state is been transferred from one operation to another, meaning you can easily trace it back to the original form.

Modifications are also welcome.
You can easily transform, schedule, or filter values within the pipe just by adding operators:


interval(1000)
.pipe(
  map(value => value * 1000), // 0, 1, 2, 3, 4, * 1000
  filter(value => value < 3000), // return not greater than 3000
  take(5) // execute exactly 5 times
  )
.subscribe((data) => console.log(data)) // 0, 1000, 2000             
                    

Rx.js is not only Declarative but also Reactive. It reacts to changes. To explain what this means we'll look into the Observer pattern next.



Photo by Atypeek Dgn from Pexels

The Observer Pattern


In software design and engineering, the Observer pattern is a software design pattern in which an object, named the Subject, maintains a list of its dependents, called Observers, and notifies them automatically of any state changes, usually by calling one of their methods.
-Wikipedia


In simple terms, you have one object, function, or component that is producing events and a list of subscribers to those events that are invoked (automatically) whenever a new event arrives.


Example of an Observer pattern

Let's try to think about how Youtube works. You as a YoutubeUser can search and watch videos, comment, and like, but the interesting part is that you can (1) subscribe to some Youtube channels and you'll be (2) notified whenever a new video goes live.


class YoutubeUser {

  private currentUser;

  get username() {
    return this.currentUser;
  }

  constructor(user: string) {
    this.currentUser = user;
  }

  searchVideos() {}
  watchVideos() {}

 // (1)
  subscribeToChannel(youtubeChannel: YoutubeChannel) {
    youtubeChannel.addSubscriber(this.currentUser);
  }

// (2)
  getVideoNotification(videoName: string, youtuber: string) {
    console.log(`[${this.currentUser} Notification]:
    ${youtuber} has uploaded a new video: ${videoName}`);
  }
}        
                    

On the YoutubeChannel side of things, a channel has a list of (1) subscribers, which is been (2) updated when a YoutubeUser subscribes to the channel. Each channel subscriber (from the list) gets notified when a new video is (3) uploaded.


class YoutubeChannel {

  private youtuber!: YoutubeUser;

  // (1)
  channelSubscribers: YoutubeUser[] = [];

  get channelName() {
    return this.youtuber.username;
  }

  constructor(channelName: string) {
    this.youtuber = new YoutubeUser(channelName);
  }

 // (2)
  addSubscriber(channelName: string) {
    this.channelSubscribers.push(new YoutubeUser(channelName))
  }

 // (3)
  uploadVideo(videoName: string) {
    this.channelSubscribers
      .forEach((subscriber: YoutubeUser) => subscriber
        .getVideoNotification(videoName, this.youtuber.username))
// each subscriber is an instance of YoutubeUser from 
  // channelSubscribers array
// each calls it's getVideoNotification() method that
  // will print notification whenever uploadVideo() is called
  }

}
                      

Let's create some Youtube Channels:


const ninjaTurtlesChannel = new YoutubeChannel('Master Splinter');
const dragonBallChannel = new YoutubeChannel('Master Roshi');
                      

And let's create users that will subscribe to these channels:


const youtubeUserLeo = new YoutubeUser('Leonardo')
const youtubeUserRaf = new YoutubeUser('Raphael')
const youtubeUserGoku = new YoutubeUser('Goku');
const youtubeUserKrillin = new YoutubeUser('Krillin');

// Youtube Users are added to the subscribers list of each channel
youtubeUserLeo.subscribeToChannel(ninjaTurtlesChannel);
youtubeUserRaf.subscribeToChannel(ninjaTurtlesChannel);
youtubeUserGoku.subscribeToChannel(dragonBallChannel);
                      

And what happens now? Well, nothing. Now we wait for one of the channels we subscribed to upload a new video:


ninjaTurtlesChannel.uploadVideo('Ninjutsu lessons');
dragonBallChannel.uploadVideo('Teaching Kamehameha');
                      

When a new video goes up, subscribers get immediately notified:


[LOG]: "[Leonardo Notification]:
    Master Splinter has uploaded a new video: Ninjutsu lessons" 
[LOG]: "[Raphael Notification]:
    Master Splinter has uploaded a new video: Ninjutsu lessons" 
[LOG]: "[Goku Notification]:
    Master Roshi has uploaded a new video: Teaching Kamehameha"
                  

If another video goes live from Master Splinter, his subscribers (Ninja Turtles) will be notified again:


ninjaTurtlesChannel.uploadVideo('Kung Fu lessons');

[LOG]: "[Leonardo Notification]:
    Master Splinter has uploaded a new video: Kung Fu lessons" 
[LOG]: "[Raphael Notification]:
    Master Splinter has uploaded a new video: Kung Fu lessons"
                  



What does any of this have to do with Reactive Programming?
It's simple math:

Declarative Programming + Observer Pattern + Huge List of Operators = ReactiveX

In the Reactive programming context, the:

  • YoutubeChannel is a Subject — that keeps a list of subscribers and produces values (videos)
  • YoutubeUser is an Observer — that observes changes (new videos)
  • Data passed between the two is wrapped in the Observable



ReactiveX is a library used for building asynchronous applications with Observable streams.

Its purpose is to bring the power of Reactive programming into your apps, by providing solutions for major programming languages:


  • Rx.js (JavaScript)
  • RxJava (Java)
  • Rx.NET (C# .NET)
  • RxPy (Python)
  • RxCpp (C++), etc.


The ReactiveX uses the Observer pattern to track changes and abstracts that logic using Declarative principles. This combination of Declarative programming and the Observer pattern is what makes working with asynchronous data easy.


Generated by MidJourney AI



The Observables

At the heart of ReactiveX lie the Observables.

The data that is passed from Producer (Subject) to the Consumer/Receiver (Observer) is wrapped into an Observable. The Observable represents a sequence of events that occur in a never-ending interval.

An Observable where the producer is a radio chatter


That said you can unsubscribe from an Observable at any time. To extract data from an Observable (or should I say convert Observable<T> to plain <T>), you put a call to subscribe() function at the end. The subscribe() function returns a callback function that holds raw data as its first parameter.


of('Hello World') // creates Observable<string>
.pipe(
  // in between you can transform Observable<string>
)
// here you get the raw data
.subscribe((data: string) => {
  console.log(data); // 'Hello World'
})
                  


The Observable pattern is closely related to the Observer pattern. The key difference is that in the Observer pattern, observers register with a Subject to receive updates, while in the Observable pattern, observers subscribe to an Observable to receive emitted values or events.


Reactive Programming is programming with data streams called Observables.


Characteristics of Observables

In order to effectively work with Observables, we need to fully understand how they behave:


# Observables are Lazy

What this means is that without the subscribe() call at the end, the Observable will never be activated, it will never create an API call or send data to consumers.



function getGreeting() {
  return of('Hello World');
}

function sayGreeting() {
 getGreeting()
 // nothing will happen until you subscribe 
}
                  


# Synchronous and Asynchronous

The Observable can work both in a synchronous and asynchronous manner depending on the producer. An observable created from a primitive value (like a string) is no doubt synchronous, while an Observable produced by an API or a timer is asynchronous. In addition to that, Observables can also be converted from one type to another using Schedulers.

# Observable can process a stream of data


# Observables can be invoked multiple times

Because observables work on callbacks.

# Observable can be unsubscribed from

This can be achieved in a number of ways, either using a subscription:

const subscription$ = of('Hello').subscribe();
subscription$.unsubscribe(); 
                  

Or reactively after an Observable is called several times:

of('Hello')
  .pipe(
    take(1) // or takeUntil, takeWhile
  )
  .subscribe();
                  



If we compare this to a JavaScript Promise, we can immediately see that Promises:

  • Behave in an Eager manner
    Activated when invoked.

  • Are only Asynchronous

  • Can Process a single data at the time

  • Can be called only once at a time
    Promises can be in one of three states, pending, resolved, or rejected.

  • Cannot be canceled
    Unless you're using Abort Controller with Fetch API.

And you can see how much ground an Observable covers.




Photo by Helena Jankovičová Kováčová from Pexels



Working with Observables

Let's give Observables a second look.

There are two types of Observables:
  • Unicast (Cold)
  • Multicast (Hot)

The difference between the two is how they share data with existing and late subscribers. In the former, the Observable is invoked whenever a new subscriber joins the club (subscribes to the Observable).

With Hot Observables early subscribers get the data, while late ones do not get anything until Observable emits again, in which case both early and late get the same (latest) data.
This of course can vary depending on the type of producer (Subject).

The Observables are Cold by nature but can be converted to Hot using Subjects or a set of operators (e.g. share()).



The Observable can be in one of three states:

  • Next (Active) state (-a-a-a-a-)
  • Error state (-a-a-#)
  • Completed state (-a-a-|)

While in the next state, the Observable can continuously emit data, hence why we referred to it previously as a never-ending stream.
However, in case of an unhandled error, the Observable will stop emitting data. There is also a completed state in which Observable is considered as successfully finished and can no longer emit data.

In web frameworks, it's recommended to unsubscribe from any active observables in components that are no longer been used to avoid memory leaks.




Naming Conventions

The name of the variable that holds the Observable usually ends with a $ sign, e.g. data$.




Create Transform Execute Complete

To create a new Observable we use the Observable instance that returns an Observer as a parameter of a callback function.


const data$ = new Observable((observer: Subscriber<string>) => {
  observer.next('Hello'); // emits strings as specified above

    if (somethingWentWrong) {
      observer.error('Something went wrong');
    }

    observer.complete();
  })
                  

There are shorthand alternatives to create Observables like of() or from().




In between the source Observable and the subscribe() function you can create a pipe function that holds operators that will transform, filter, destroy (unsubscribe), schedule, or combine Observables.

of([...])
.pipe(
  // transformations
)
.subscribe()
                  



The subscribe function also returns an Observer object that can track changes.


data$.subscribe({
  next: (data: string) => console.log(data), // Success
  error: (error: unknown) => console.log(error), // Fail
  complete: () => console.log('Observable completed') // Completion
});
                  

Like before there is a shorthand version for subscribe function.


// Success path
data$.subscribe((data: string) => console.log(data));
                  



Synchronous vs Asynchronous Rx approach

In your typical JavaScript application, there is a distinct difference in how you handle synchronous or asynchronous data.

Rx.js however, uses the same API to do both. Looking at the code below it's hard to tell if the code is asynchronous or not.


of('Hello')
  .pipe(
    take(1)
  )
  .subscribe(); 
                  

This is synchronous, while the example below is asynchronous:


timer(1000)
  .pipe(
    take(1)
  )
  .subscribe();
                  

Even events that are synchronous by nature (like a string text) can be converted to asynchronous using schedulers, delays, or by calling an API in the next step based on a string provided in the producer (source value).


of('Hello', AsapScheduler) // async operation
  .pipe(
     take(1)
   )
   .subscribe();
                  

The bottom line, when working with Rx.js Observable streams there is no difference in processing data that is synchronous compared to one that is not.






Pros and Cons of ReactiveX

Now let's look at the Pros and Cons and when you'd want to use Rx.js:

Benefits:
  • Plethora of operators
    With 100+ operators at your disposal, you shape the output however you like.
  • Declarative programming
  • Observables
  • Wide support of languages and frameworks
    Rx.js is heavily used in Angular, but it also can be integrated with other frameworks.
  • Better APIs
    With Rx, you can cancel, retry, cache among consumers, delay one API until the existing one is going, handle errors, deal with race conditions, etc.
  • The same API for processing sync and async operations
  • State management
    Rx supports a variety of Subjects that provide an immutable state that can be shared among components.
  • Testing
    Rx.js TestScheduler allows us to test asynchronous code as if it was synchronous, using Marble Diagrams, and simulate delays resulting in better performant tests.
Drawbacks:
  • Hard to grasp
    Reactive programming is hard to get used to at first because developers are not used to approaching things in this manner.
  • Abundance of operators
    This is also a negative thing because it will take a long time to learn these operators, concepts, and tools, what each does, and how to use them effectively.





Setting up Rx.js in your project

There are a number of ways to include Rx.js in your existing project:

  • Web app: Paste a CDN link into your project root file (index.html)
  • Node.js backend app — Install Rx.js into your project using npm npm i rxjs
  • Web app with frameworks— Install Rx.js into your project using npm i rxjs or yarn add rxjs
  • Angular & Nest.js — These frameworks support and have Rx.js installed out of the box

I recently published a blog explaining how to use NPM modules in web applications without frameworks and here I set up Rx.js into a vanilla JS/TS web app (without CDN). If you're interested here is a full guide and a repository.



Wrapping Up

To summarize, Reactive programming describes a paradigm that relies on asynchronous programming logic to trigger changes in real-time.
The logic behind it lies in the Observer pattern. Producers of the events are called Subjects who sent data wrapped in an Observable to Observers that act upon it. The code itself is written in a Declarative manner where each step is an operator that can manipulate the data.

All in all, Reactive programming, Observables, and ReactiveX are vast topics. Hopefully, in this article, you were able to acknowledge the possibilities of this programming style.

Bye for now!




Read More

Author:



Follow Us