You can create an Observable from scratch by using the Create
operator. You pass this operator a function that accepts the observer as its parameter. Write this
function so that it behaves as an Observable — by calling the observer’s onNext
,
onError
, and onCompleted
methods appropriately.
A well-formed finite Observable must attempt to call either the observer’s onCompleted
method exactly once or its onError
method exactly once, and must not thereafter
attempt to call any of the observer’s other methods.
TBD
RxGroovy implements this operator as create
.
def myObservable = Observable.create({ aSubscriber -> try { for (int i = 1; i < 1000000; i++) { if (aSubscriber.isUnsubscribed()) { return; } aSubscriber.onNext(i); } if (!aSubscriber.isUnsubscribed()) { aSubscriber.onCompleted(); } } catch(Throwable t) { if (!aSubscriber.isUnsubscribed()) { aSubscriber.onError(t); } } })
It is good practice to check the observer’s isUnsubscribed
state so that your
Observable can stop emitting items or doing expensive calculations when there is no longer an
interested observer.
create
does not by default operate on any particular
Scheduler.
create(OnSubscribe)
RxJava implements this operator as create
.
It is good practice to check the observer’s isUnsubscribed
state from within the
function you pass to create
so that your Observable can stop emitting items or
doing expensive calculations when there is no longer an interested observer.
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
create
does not by default operate on any particular
Scheduler.
create(OnSubscribe)
RxJS implements this operator as create
(there is also an alternate name for the
same operator: createWithDisposable
).
/* Using a function */ var source = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); // Note that this is optional, you do not have to return this if you require no cleanup return function () { console.log('disposed'); }; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
/* Using a disposable */ var source = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); // Note that this is optional, you do not have to return this if you require no cleanup return Rx.Disposable.create(function () { console.log('disposed'); }); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
create
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
You can use the generate
operator to create simple Observables that can generate their
next emissions, and can determine when to terminate, based on the value of the previous emission. The
basic form of generate
takes four parameters:
true
) or terminate the
Observable (false
)
You can also pass in as an optional fifth parameter a Scheduler that
generate
will use to create and emit its sequence (it uses currentThread
by
default).
var source = Rx.Observable.generate( 0, function (x) { return x < 3; }, function (x) { return x + 1; }, function (x) { return x; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
generate
is found in the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
You can use the generateWithRelativeTime
operator to create simple Observables that can
generate their next emissions, and can determine when to terminate, based on the value of the previous
emission. The basic form of generateWithRelativeTime
takes five parameters:
true
) or terminate the
Observable (false
)
You can also pass in as an optional sixth parameter a Scheduler that
generate
will use to create and emit its sequence (it uses currentThread
by
default).
var source = Rx.Observable.generateWithRelativeTime( 1, function (x) { return x < 4; }, function (x) { return x + 1; }, function (x) { return x; }, function (x) { return 100 * x; } ).timeInterval(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: {value: 1, interval: 100} Next: {value: 2, interval: 200} Next: {value: 3, interval: 300} Completed
generateWithRelativeTime
is found in the following distributions:
rx.lite.js
rx.lite.compat.js
rx.time.js
(requires rx.js
or rx.compat.js
)
You can use the generateWithAbsoluteTime
operator to create simple Observables that can
generate their next emissions, and can determine when to terminate, based on the value of the previous
emission. The basic form of generateWithAbsoluteTime
takes five parameters:
true
) or terminate the
Observable (false
)Date
) the generator should emit the
the new item
You can also pass in as an optional sixth parameter a Scheduler that
generate
will use to create and emit its sequence (it uses currentThread
by
default).
var source = Rx.Observable.generate( 1, function (x) { return x < 4; }, function (x) { return x + 1; }, function (x) { return x; }, function (x) { return Date.now() + (100 * x); } ).timeInterval(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: {value: 1, interval: 100} Next: {value: 2, interval: 200} Next: {value: 3, interval: 300} Completed
generateWithAbsoluteTime
is found in the following distribution:
rx.time.js
(requires rx.js
or rx.compat.js
)TBD
RxPHP implements this operator as create
.
Creates an observable sequence from a specified subscribeAction callable implementation.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php //With static method $source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) { $observer->onNext(42); $observer->onCompleted(); return new CallbackDisposable(function () { echo "Disposed\n"; }); }); $subscription = $source->subscribe($createStdoutObserver());
Next value: 42 Complete! Disposed
RxSwift implements this operator as create
.
let source : Observable= Observable.create { observer in for i in 1...5 { observer.on(.next(i)) } observer.on(.completed) // Note that this is optional. If you require no cleanup you can return // `Disposables.create()` (which returns the `NopDisposable` singleton) return Disposables.create { print("disposed") } } source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed disposed
You can use the generate
operator to create simple Observables that can generate their
next emissions, and can determine when to terminate, based on the value of the previous emission. The
basic form of generate
takes three parameters:
true
) or terminate the
Observable (false
)
You can also pass in as an optional fourth parameter a Scheduler that
generate
will use to create and emit its sequence (it uses CurrentThreadScheduler
by
default).
let source = Observable.generate( initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 } ) source.subscribe { print($0) }
next(0) next(1) next(2) completed