The basics
Converting to observables
// From one or multiple values
Rx.Observable.of('foo', 'bar');
// From array of values
Rx.Observable.from([1,2,3]);
// From an event
Rx.Observable.fromEvent(document.querySelector('button'), 'click');
// From a Promise
Rx.Observable.fromPromise(fetch('/users'));
// From a callback (last argument is a callback)
// fs.exists = (path, cb(exists))
var exists = Rx.Observable.bindCallback(fs.exists);
exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));
// From a callback (last argument is a callback)
// fs.rename = (pathA, pathB, cb(err, result))
var rename = Rx.Observable.bindNodeCallback(fs.rename);
rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));
Creating observables
Externally produce new events.
var myObservable = new Rx.Subject();
myObservable.subscribe(value => console.log(value));
myObservable.next('foo');
Internally produce new events.
var myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
myObservable.subscribe(value => console.log(value));
Which one you choose depends on the scenario. The normal Observable is great when you want to wrap functionality that produces values over time. An example would be a websocket connection. With Subject you can trigger new events from anywhere really and you can connect existing observables to it.
Controlling the flow
// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');
// Filter out target values less than 3 characters long
input.filter(event => event.target.value.length > 2)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hel"
// Delay the events
input.delay(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h" -200ms-> "e" -200ms-> "l" ...
// Only let through an event every 200 ms
input.throttleTime(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h" -200ms-> "w"
// Let through latest event after 200 ms
input.debounceTime(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "o" -200ms-> "d"
// Stop the stream of events after 3 events
input.take(3)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hel"
// Passes through events until other observable triggers an event
var stopStream = Rx.Observable.fromEvent(document.querySelector('button'), 'click');
input.takeUntil(stopStream)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hello" (click)
Producing values
// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');
// Pass on a new value
input.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h"
// Pass on a new value by plucking it
input.pluck('target', 'value')
.subscribe(value => console.log(value)); // "h"
// Pass the two previous values
input.pluck('target', 'value').pairwise()
.subscribe(value => console.log(value)); // ["h", "e"]
// Only pass unique values through
input.pluck('target', 'value').distinct()
.subscribe(value => console.log(value)); // "helo wrd"
// Do not pass repeating values through
input.pluck('target', 'value').distinctUntilChanged()
.subscribe(value => console.log(value)); // "helo world"
Creating applications
RxJS is a great tool to keep your code less error prone. It does that by using pure and stateless functions. But applications are stateful, so how do we bridge the stateless world of RxJS with the stateful world of our applications?
Let us create a simple state store of the value 0
. On each click we want to increase that count in our state store.
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
// scan (reduce) to a stream of counts
.scan(count => count + 1, 0)
// Set the count on an element each time it changes
.subscribe(count => document.querySelector('#count').innerHTML = count);
So producing state is within the world of RxJS, but changing the DOM is a side effect which happens at "the end of the line".
State stores
Applications use state stores to hold state. These are called different things in different frameworks, like store, reducer and model, but at the core they are all just a plain object. What we also need to handle is that multiple observables can update a single state store.
var increaseButton = document.querySelector('#increase');
var increase = Rx.Observable.fromEvent(increaseButton, 'click')
// We map to a function that will change our state
.map(() => state => Object.assign({}, state, {count: state.count + 1}));
What we do here is mapping a click event to a state changing function. So instead of mapping to a value, we map to a function. A function will change the state of our state store. So now let us see how we actually make the change.
var increaseButton = document.querySelector('#increase');
var increase = Rx.Observable.fromEvent(increaseButton, 'click')
.map(() => state => Object.assign({}, state, {count: state.count + 1}));
// We create an object with our initial state. Whenever a new state change function
// is received we call it and pass the state. The new state is returned and
// ready to be changed again on the next click
var state = increase.scan((state, changeFn) => changeFn(state), {count: 0});
We can now add a couple of more observables which will also change the same state store.
var increaseButton = document.querySelector('#increase');
var increase = Rx.Observable.fromEvent(increaseButton, 'click')
// Again we map to a function the will increase the count
.map(() => state => Object.assign({}, state, {count: state.count + 1}));
var decreaseButton = document.querySelector('#decrease');
var decrease = Rx.Observable.fromEvent(decreaseButton, 'click')
// We also map to a function that will decrease the count
.map(() => state => Object.assign({}, state, {count: state.count - 1}));
var inputElement = document.querySelector('#input');
var input = Rx.Observable.fromEvent(inputElement, 'keypress')
// Let us also map the keypress events to produce an inputValue state
.map(event => state => Object.assign({}, state, {inputValue: event.target.value}));
// We merge the three state change producing observables
var state = Rx.Observable.merge(
increase,
decrease,
input
).scan((state, changeFn) => changeFn(state), {
count: 0,
inputValue: ''
});
// We subscribe to state changes and update the DOM
state.subscribe((state) => {
document.querySelector('#count').innerHTML = state.count;
document.querySelector('#hello').innerHTML = 'Hello ' + state.inputValue;
});
// To optimize our rendering we can check what state
// has actually changed
var prevState = {};
state.subscribe((state) => {
if (state.count !== prevState.count) {
document.querySelector('#count').innerHTML = state.count;
}
if (state.inputValue !== prevState.inputValue) {
document.querySelector('#hello').innerHTML = 'Hello ' + state.inputValue;
}
prevState = state;
});
We can take the state store approach and use it with many different frameworks and libraries.
Immutable JS
You can also create a global state store for your application using Immutable JS. Immutable JS is a great way to create immutable state stores that allows you to optimize rendering by doing shallow checks on changed values.
import Immutable from 'immutable';
import someObservable from './someObservable';
import someOtherObservable from './someOtherObservable';
var initialState = {
foo: 'bar'
};
var state = Observable.merge(
someObservable,
someOtherObservable
).scan((state, changeFn) => changeFn(state), Immutable.fromJS(initialState));
export default state;
Now you can import your state in whatever UI layer you are using.
import state from './state';
state.subscribe(state => {
document.querySelector('#text').innerHTML = state.get('foo');
});
React
Lets look at an example where we subscribe to an observable when the component mounts and unsubscribes when it unmounts.
import messages from './someObservable';
class MyComponent extends ObservableComponent {
constructor(props) {
super(props);
this.state = {messages: []};
}
componentDidMount() {
this.messages = messages
// Accumulate our messages in an array
.scan((messages, message) => [message].concat(messages), [])
// And render whenever we get a new message
.subscribe(messages => this.setState({messages: messages}));
}
componentWillUnmount() {
this.messages.unsubscribe();
}
render() {
return (
<div>
<ul>
{this.state.messages.map(message => <li>{message.text}</li>)}
</ul>
</div>
);
}
}
export default MyComponent;
There are many other ways to use observables with React as well. Take a look at these:
- rxjs-react-component. It will allow you to expose observables that maps to state changes. Also use observables for lifecycle hooks
External References
Can't get enough RxJS? Check out these other great resources!
Tutorials
- RxJS @ Egghead.io
- The introduction to Reactive Programming you've been missing
- 2 minute introduction to Rx
- Learn RxJS - @jhusain
- Rx Workshop
- Reactive Programming and MVC
- RxJS Training - @andrestaltz
Books
- RxJS in Action
- RxJS
- Intro to Rx
- Programming Reactive Extensions and LINQ
- Reactive Programming with RxJS
Videos
- Practical Rx with Matthew Podwysocki, Bart de Smet and Jafar Husain
- Netflix and RxJS
- Hello RxJS - Channel 9
- MIX 2011
- RxJS Today and Tomorrow - Channel 9
- Reactive Extensions Videos on Channel 9
- Asynchronous JavaScript at Netflix - Netflix JavaScript Talks - Jafar Husain
- Asynchronous JavaScript at Netflix - MountainWest JavaScript 2014 - Jafar Husain
- Asynchronous JavaScript at Netflix - HTML5DevConf - Jafar Husain
- Adding Even More Fun to Functional Programming With RXJS - Ryan Anklam
- Reactive Angular - Devoxx France 2014 - Martin Gontovnikas
- Reactive Game Programming for the Discerning Hipster - JSConf 2014 - Bodil Stokke
Presentations
- Don't Cross the Streams - Cascadia.js 2012 slides/demos | video
- Curing Your Asynchronous Blues - Strange Loop 2013 slides/demos | video
- Streaming and event-based programming using FRP and RxJS - FutureJS 2014 slides/demos | video
- Tyrannosaurus Rx - James Hughes
- Taming Asynchronous Workflows with Functional Reactive Programming - EuroClojure - Leonardo Borges slides | video
- Reactive All the Things - ng-conf 2015 - Martin Gontovnikas & Ben Lesh
- The Reactive Loop - Functional JS London 2015
- Reactive Functions with RxJS - Leeds JS 2015