RxJS: Accessing a previous value further down the pipe chain

 Often we want to access a value from a previous operator in RxJS, but that value is no longer available. Let me illustrate the problem with an example:

  1. We have a users email in our code
  2. From the email we want to get a user object
  3. We then want to find all the products that this user has shown interest for
  4. We then want to see if our user is a premium member, if so we want to return the products with discounted prices
getUser(email).pipe(
mergeMap(user => getProducts(user.interests)),
map(products => {
// The line below will fail, the user is undefined
if (user.isPremiumMember) {
return discountPrices(products);
} else {
return products;
}
})
);

As you can see above, we want to access the user object both in the “mergeMap” and in the following “map”. But the user object is no longer available in the “map”. So what can we do?

Solution 1 and 2 will behave a bit differently from solution 3 and 4. In solution 1 and 2 we will be passing the original value down the chain. In solution 3 and 4 we will be combining the original stream in again, and if the value of this original stream has changed in the meantime, we will get the new value.

Solution 1: Pass the values down the chain with a nested pipe and map

This is probably the easiest to implement, but it feels a bit hacky as we need to nest a second pipe inside of our first pipe:

getUser(email).pipe(
mergeMap(user => {
return getProducts(user.interests).pipe(
map(products => {
return {user: user, products: products};
})
);
}),
map(({user, products}) => {
// Here we can access both user and products
})
);

Solution 2: Nest a combineLatest or forkJoin inside the chain

Here we are using the combineLatest or forkJoin inside the pipe to pass along both the old and the new value. We put the previous value back into an observable with of() :

getUser(email).pipe(
mergeMap(user => {
return combineLatest(
of(user),
getProducts(user.interests)
);
}),
map(([user, products]) => {
// Here we can access both user and products
})
);

Solution 3: Divide the chain into several parts, then use combineLatest or forkJoin

This solution is good as it divides the code into several parts instead of nesting them. As mentioned earlier in the article, in solution 3 and 4 the user property can potentially have changed. For example if the observable returned from “getProducts()” uses some time, and the user stream have received a new value in the meantime.

const user$ = getUser(email);
const products$ = user$.pipe(
mergeMap(user => getProducts(user.interests))
);
combineLatest(user$, products$).pipe(
map(([user, products]) => {
// Here we can access both user and products
})
);

Solution 4, get the latest original stream with getLatestFrom:

Solution 3 and 4 are mostly the same, but they have a small difference in functionality. The “combineLatest” in solution 3 will create an event every time any of the combined observables emit. The “withLatestFrom” in solution 4 will only emit if a value goes through the encapsulating pipe.

const user$ = getUser(email);
const products$ = user$.pipe(
mergeMap(user => getProducts(user.interests)),
withLatestFrom(user$),
map(([products, user]) => {
// Here we can access both user and products
})
);
The obligatory RxJS stream image

Let me know if this was helpful, or if there are other solutions that I have missed.

Comments

Popular posts from this blog

Get Started Building Microservices with ASP.NET Core and Docker in Visual Studio Code