At its core, RxJS is all about handling asynchronous events using observable sequences.
Observables are dynamic data sources (similar to streams) that you can observe and react to as they evolve, or when they emit errors.
They provide a powerful mechanism for handling asynchronous processes such as API requests, user input, or recurring tasks.
RxJS pipeable operators are functions that you can use to transform, filter, and manipulate the data within your observables.
Imagine you have a stream of data that represents user actions. You might want to filter out specific actions, map them into different shapes, or combine them with other data sources.
Pipeable operators let you do all this with a clean, readable syntax.
Example: how you should not use RxJS!
In this example we simulate a data stream that outputs five consecutive values, from 1 to 5.
Let us imagine that we want:
Multiply each value by 2
Do something only when values are greater than 6
We can then subscribe to the observable and perform operations directly in the body of the function:
index.ts
That's a completely wrong approach.
Obviously the result is what we hoped for but we are not using RxJS in the correct way
A good Rxjs practice is to avoid performing operation on data within the subscribe, ensuring that the data reaches the observer in its final version
Here is the pipeable operators come into play, allowing us to manipulate the values emitted by the observable before reaching the observer.
In the following examples you will see how we can achieve the same result with Operators:
map: will replace the creation of the double variable
filter: will allow us to filter the data without using the if condition
and more...
This approach is also highly useful in Angular, particularly when we need to display a transformed value emitted by an Observable using the async pipe.
I'll show an example at the end of this article.
But there are many advantages to using a reactive approach:
1. Separation of Concerns
Pipeable Operators: each operator (map, filter and so on) is responsible for a specific task, making the code more modular and easier to understand. For example, the map operator handles the transformation, and the filter operator handles the selection of values.
Imperative Approach: the logic is combined into a single subscribe block, mixing transformation (double = val * 2) and conditional logic (if (double > 5)). This makes the code less clear and harder to maintain.
2. Declarative Nature
Pipeable Operators: RxJS encourages a declarative programming style, where you describe what should happen to the data rather than how to do it. The pipeline expresses the data flow and transformations in a clear and declarative manner.
Imperative Approach: This approach is more procedural and imperative, focusing on step-by-step instructions that can be harder to follow, especially in complex scenarios.
3.Error Handling
Pipeable Operators: RxJS operators allow for more sophisticated error handling strategies, such as using catchError or retry. Handling errors at each stage of the pipeline is clean and straightforward.
Imperative Approach: Error handling in the imperative approach can become messy and less structured, especially as the logic inside subscribe grows.
Readability and Maintainability
Pipeable Operators: the pipeline structure we'll use in the next examples clearly outlines the data flow:
.ts
In a real example we will use operators such as map, filters, reduce and several others
.ts
It's easy to read, and someone familiar with RxJS can immediately understand what the code is doing.
Imperative Approach: the logic is embedded within the subscribe block, making it harder to quickly grasp what’s happening, especially as the code grows in complexity.
But let’s see how to use them in practice.
I recommend you to run the snippets by clicking on the PLAY button, open the console, see what happens and play a little with the code
In this scenario we use the filter just after the map operator. This operator emits values that satisfy a condition and it's very similar to the filter array method
This is the first time we combine multiple operator.
index.ts
1
After the values are mapped (multiplied by 2), the filter operator removes any values that are less than or equal to 5. As a result, only the values 6, 8, 10 are output.
2
The addition of the filter operator in the current example adds a layer of data selection after the transformation. This means that while the first step (map) modifies all values, the second step (filter) narrows down the output to only those values that meet the condition
Marble Diagram
Here's a textual marble diagram that represents the of(1, 2, 3, 4, 5).pipe(map, filter) sequence:
Operators Chain
When you chain pipeable operators in RxJS, you’re essentially creating a sequence of operations that will be applied to the data emitted by an observable, one step at a time from top to bottom.
The data flows through the operators in the order they are listed in the pipe().
Each operator processes the data and passes the result to the next operator in the chain.
Each operator returns a new observable with the transformed data, rather than modifying the original data.
TIP for experts: this immutability ensures that the operations are predictable and side-effect-free.
Since each operator returns an observable, you can easily compose multiple operators together to handle complex scenarios.
This composability is a powerful feature of RxJS, allowing you to build flexible and reusable data pipelines.
We'll analyse these scenarios in the next lessons when we'll discover other amazing operators
The toArray() operator is a utility operator that collects all the emitted values from an observable into a single array.
It waits until the source observable completes, then emits this array as a single value.
This is crucial because you could not use toArray on an interval observable, that never completes. In that case the operator would continue to collect values endlessly without ever issuing the array
index.ts
The toArray() operator is particularly useful when you want to collect all emitted values into an array and handle them as a single entity, rather than processing each value individually
Marble Diagram
Here's a textual marble diagram that represents the sequence:
It accumulates the values emitted by the observable into a single value based on a provided accumulator function and an initial seed value.
The reduce() operator only emits this final accumulated value once the source observable completes.
index.ts
How it works
The filter operator allows only values greater than 5 to pass through. The resulting sequence is 6, 8, 10
The reduce operator now comes into play. It takes two arguments:
Accumulator Function (acc, val) => acc + val: This function specifies how to accumulate the values.
It takes the current accumulated value (acc) and the current value from the observable (val), and adds them together.
Initial Value 0: This is the starting value for the accumulation.
Consider a scenario where we invoke a REST API returning an array of objects.
How can we manipulate each element when the Observable emits only one value, a single array?
To illustrate, we start with a simple case and then build complexity.
In the following example, the Observable emits a single value, which is an array of numbers. Suppose you want to transform each element in the array and combine the results into a single value, such as the sum of all elements.
While you can use the reduce operator for this, you might encounter unexpected behavior:
index.ts
How it works
The map operator attempts to multiply the emitted value by 2.
Since the emitted value is the array, the operation [1, 2, 3, 4, 5] * 2 does not behave as intended and return NaN (Not a Number)
The filter operator then checks if val is greater than 5.
Since val is NaN, and any comparison NaN > 5 is always false, the filter operator does not pass any value through to the next step.
The reduce operator is supposed to accumulate values, but since the filter step blocked any values from passing through, reduce has nothing to accumulate. So it returns its initial accumulator value 0
Debug with Tap
How did I find out that the map emits a NaN value?
I often use the tap operator to log the values that pass through the observable stream at a specific point in the pipeline.
The tap operator is useful because it allows you to "peek" at the values as they flow through the stream without modifying them, which can help you understand and debug what is happening at each step. In fact the tap operator does not have to return a value, while the other operators do.
Run the following script to see how the tap operator displays the NaN in the console.
So by moving the tap under each operator, it’s easy to see what is happening to our data.
index.ts
#mergeMap: emit a value for each element of the array
When you pass an array through operators like map and filter, these operators try to apply their functions to the entire array, leading to unexpected results (like NaN in a multiplication or failing conditions in filters just as we saw in the previous example).
mergeMap is an RxJS operator that is often used to handle observable sequences (we'll talk about it later in the book), but it also has a useful trick: it can flatten a single emitted array into individual values.
So, in the next example we apply the mergeMap to emit a value for each element of the array and the map operator receives five values instead one (the original array):
index.ts
Another example with an array of object
This code calculates the total cost of all products in an array using RxJS (this array could be the result of an HTTP call, but it doesn’t change anything if this value is emitted by an of operator or an ajax operator. The mechanism would be the same.
Anyway I will show you an example with Angular and HttpClient in the next paragraphs.
index.ts
The mergeMap operator flattens the array emitted by the observable.
Instead of working with the entire array as one unit, it breaks it down and emits each product object individually as separate emissions.
The reduce operator accumulates the total cost of all the products.
It starts with an initial accumulator value of 0 and adds the cost of each emitted product to this accumulator.