RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Public Types | Public Member Functions | Public Attributes | Friends | List of all members
rxcpp::observable< T, SourceOperator > Class Template Reference

a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source. More...

#include <rx-observable.hpp>

Inheritance diagram for rxcpp::observable< T, SourceOperator >:
Inheritance graph
[legend]
Collaboration diagram for rxcpp::observable< T, SourceOperator >:
Collaboration graph
[legend]

Public Types

typedef rxu::decay_t< SourceOperator > source_operator_type
 
typedef T value_type
 
- Public Types inherited from rxcpp::observable_base< T >
typedef tag_observable observable_tag
 
typedef T value_type
 

Public Member Functions

 ~observable ()
 
 observable ()
 
 observable (const source_operator_type &o)
 
 observable (source_operator_type &&o)
 
template<class SO >
 observable (const observable< T, SO > &o)
 implicit conversion between observables of the same value_type More...
 
template<class SO >
 observable (observable< T, SO > &&o)
 implicit conversion between observables of the same value_type More...
 
template<class... AN>
observable< T > as_dynamic (AN **...) const
 
template<class... AN>
blocking_observable< T, this_typeas_blocking (AN **...) const
 
template<class... ArgN>
auto subscribe (ArgN &&...an) const -> composite_subscription
 Subscribe will cause the source observable to emit values to the provided subscriber. More...
 
template<class... AN>
auto all (AN &&...an) const
 Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. Emits true if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto is_empty (AN &&...an) const
 Returns an Observable that emits true if the source Observable is empty, otherwise false. More...
 
template<class... AN>
auto any (AN &&...an) const
 Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto exists (AN &&...an) const
 Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto contains (AN &&...an) const
 Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto filter (AN &&...an) const
 For each item from this observable use Predicate to select which items to emit from the new observable that is returned. More...
 
template<class... AN>
auto switch_if_empty (AN &&...an) const
 If the source Observable terminates without emitting any items, emits items from a backup Observable. More...
 
template<class... AN>
auto default_if_empty (AN &&...an) const
 If the source Observable terminates without emitting any items, emits a default item and completes. More...
 
template<class... AN>
auto sequence_equal (AN...an) const
 Determine whether two Observables emit the same sequence of items. More...
 
template<class... AN>
auto tap (AN &&...an) const
 inspect calls to on_next, on_error and on_completed. More...
 
template<class... AN>
auto time_interval (AN &&...an) const
 Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable. The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item. More...
 
template<class... AN>
auto timeout (AN &&...an) const
 Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable. More...
 
template<class... AN>
auto timestamp (AN &&...an) const
 Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. More...
 
template<class... AN>
auto finally (AN &&...an) const
 Add a new action at the end of the new observable that is returned. More...
 
template<class... AN>
auto on_error_resume_next (AN &&...an) const
 If an error occurs, take the result from the Selector and subscribe to that instead. More...
 
template<class... AN>
auto switch_on_error (AN &&...an) const
 If an error occurs, take the result from the Selector and subscribe to that instead. More...
 
template<class... AN>
auto map (AN &&...an) const
 For each item from this observable use Selector to produce an item to emit from the new observable that is returned. More...
 
template<class... AN>
auto transform (AN &&...an) const
 For each item from this observable use Selector to produce an item to emit from the new observable that is returned. More...
 
template<class... AN>
auto debounce (AN &&...an) const
 Return an observable that emits an item if a particular timespan has passed without emitting another item from the source observable. More...
 
template<class... AN>
auto delay (AN &&...an) const
 Return an observable that emits each item emitted by the source observable after the specified delay. More...
 
template<class... AN>
auto distinct (AN &&...an) const
 For each item from this observable, filter out repeated values and emit only items that have not already been emitted. More...
 
template<class... AN>
auto distinct_until_changed (AN &&...an) const
 For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned. More...
 
template<class... AN>
auto element_at (AN &&...an) const
 Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission. More...
 
template<class... AN>
auto window (AN &&...an) const
 Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable. More...
 
template<class... AN>
auto window_with_time (AN &&...an) const
 Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. More...
 
template<class... AN>
auto window_with_time_or_count (AN &&...an) const
 Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler. More...
 
template<class... AN>
auto window_toggle (AN &&...an) const
 Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. More...
 
template<class... AN>
auto buffer (AN &&...an) const
 Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable. More...
 
template<class... AN>
auto buffer_with_time (AN &&...an) const
 Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer. If the skip parameter is set, Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler. More...
 
template<class... AN>
auto buffer_with_time_or_count (AN &&...an) const
 Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler. More...
 
template<class... AN>
auto switch_on_next (AN &&...an) const
 Return observable that emits the items emitted by the observable most recently emitted by the source observable. More...
 
template<class... AN>
auto merge (AN...an) const
 For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. More...
 
template<class... AN>
auto amb (AN...an) const
 For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. More...
 
template<class... AN>
auto flat_map (AN &&...an) const
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto merge_transform (AN &&...an) const
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto concat (AN...an) const
 For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned. More...
 
template<class... AN>
auto concat_map (AN &&...an) const
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto concat_transform (AN &&...an) const
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto with_latest_from (AN...an) const
 For each item from the first observable select the latest value from all the observables to emit from the new observable that is returned. More...
 
template<class... AN>
auto combine_latest (AN...an) const
 For each item from all of the observables select a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto zip (AN &&...an) const
 Bring by one item from all given observables and select a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto group_by (AN &&...an) const
 Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. More...
 
template<class... AN>
auto ignore_elements (AN &&...an) const
 Do not emit any items from the source Observable, but allow termination notification (either onError or onCompleted) to pass through unchanged. More...
 
template<class... AN>
auto multicast (AN &&...an) const
 
template<class... AN>
auto publish (AN &&...an) const
 Turn a cold observable hot and allow connections to the source to be independent of subscriptions. Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto publish_synchronized (AN &&...an) const
 Turn a cold observable hot and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto replay (AN &&...an) const
 1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto subscribe_on (AN &&...an) const
 Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination. More...
 
template<class... AN>
auto observe_on (AN &&...an) const
 All values are queued and delivered using the scheduler from the supplied coordination. More...
 
template<class... AN>
auto reduce (AN &&...an) const
 For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...
 
template<class... AN>
auto accumulate (AN &&...an) const
 For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...
 
template<class... AN>
auto first (AN **...) const
 For each item from this observable reduce it by sending only the first item. More...
 
template<class... AN>
auto last (AN **...) const
 For each item from this observable reduce it by sending only the last item. More...
 
template<class... AN>
auto count (AN **...) const
 For each item from this observable reduce it by incrementing a count. More...
 
template<class... AN>
auto sum (AN **...) const
 For each item from this observable reduce it by adding to the previous items. More...
 
template<class... AN>
auto average (AN **...) const
 For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. More...
 
template<class... AN>
auto max (AN **...) const
 For each item from this observable reduce it by taking the max value of the previous items. More...
 
template<class... AN>
auto min (AN **...) const
 For each item from this observable reduce it by taking the min value of the previous items. More...
 
template<class... AN>
auto scan (AN...an) const
 For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned. More...
 
template<class... AN>
auto sample_with_time (AN &&...an) const
 Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals. More...
 
template<class... AN>
auto skip (AN...an) const
 Make new observable with skipped first count items from this observable. More...
 
template<class... AN>
auto skip_last (AN...an) const
 Make new observable with skipped last count items from this observable. More...
 
template<class... AN>
auto skip_until (AN...an) const
 Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time. skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination) More...
 
template<class... AN>
auto take (AN...an) const
 For the first count items from this observable emit them from the new observable that is returned. More...
 
template<class... AN>
auto take_last (AN &&...an) const
 Emit only the final t items emitted by the source Observable. More...
 
template<class... AN>
auto take_until (AN &&...an) const
 For each item from this observable until on_next occurs on the trigger observable or until the specified time, emit them from the new observable that is returned. take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination) More...
 
template<class... AN>
auto take_while (AN &&...an) const
 For the first items fulfilling the predicate from this observable emit them from the new observable that is returned. More...
 
template<class... AN>
auto repeat (AN...an) const
 Repeat this observable for the given number of times or infinitely. More...
 
template<class... AN>
auto retry (AN...an) const
 Retry this observable for the given number of times. More...
 
template<class... AN>
auto start_with (AN...an) const
 Start with the supplied values, then concatenate this observable. More...
 
template<class... AN>
auto pairwise (AN...an) const
 Take values pairwise from this observable. More...
 

Public Attributes

source_operator_type source_operator
 

Friends

template<class U , class SO >
class observable
 
template<class U , class SO >
bool operator== (const observable< U, SO > &, const observable< U, SO > &)
 

Detailed Description

template<class T, class SourceOperator>
class rxcpp::observable< T, SourceOperator >

a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.

Some code
This sample will observable::subscribe() to values from a observable<void, void>::range().
Sample Code
auto values1 = rxcpp::observable<>::range(1, 5);
values1.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted

Member Typedef Documentation

template<class T, class SourceOperator>
typedef rxu::decay_t<SourceOperator> rxcpp::observable< T, SourceOperator >::source_operator_type
template<class T, class SourceOperator>
typedef T rxcpp::observable< T, SourceOperator >::value_type

Constructor & Destructor Documentation

template<class T, class SourceOperator>
rxcpp::observable< T, SourceOperator >::~observable ( )
inline
template<class T, class SourceOperator>
rxcpp::observable< T, SourceOperator >::observable ( )
inline
template<class T, class SourceOperator>
rxcpp::observable< T, SourceOperator >::observable ( const source_operator_type o)
inlineexplicit
template<class T, class SourceOperator>
rxcpp::observable< T, SourceOperator >::observable ( source_operator_type &&  o)
inlineexplicit
template<class T, class SourceOperator>
template<class SO >
rxcpp::observable< T, SourceOperator >::observable ( const observable< T, SO > &  o)
inline

implicit conversion between observables of the same value_type

template<class T, class SourceOperator>
template<class SO >
rxcpp::observable< T, SourceOperator >::observable ( observable< T, SO > &&  o)
inline

implicit conversion between observables of the same value_type

Member Function Documentation

template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::accumulate ( AN &&...  an) const
inline

For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator
Accumulatorthe type of the data accumulating function
ResultSelectorthe type of the result producing function
Parameters
seedthe initial value for the accumulator
aan accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
rsa result producing function that makes the final value from the last accumulator call result
Returns
An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.

Some basic reduce-type operators have already been implemented:

Sample Code
Geometric mean of source values:
auto values = rxcpp::observable<>::range(1, 7).
std::make_pair(0, 1.0),
[](std::pair<int, double> seed, int v){
seed.first += 1;
seed.second *= v;
return seed;
},
[](std::pair<int, double> res){
return std::pow(res.second, 1.0 / res.first);
});
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3.380015
OnCompleted
If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
auto values = rxcpp::observable<>::empty<int>().
1,
[](int,int){return 0;},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
If the accumulator raises an exception, it is returned by the resulting observable in on_error:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){
if (v == 2)
throw std::runtime_error("Exception from accumulator");
return seed;
},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from accumulator
The same for exceptions raised by the result selector:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){return seed + v;},
[](int res){
throw std::runtime_error("Exception from result selector");
return res;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from result selector
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::all ( AN &&...  an) const
inline

Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. Emits true if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).all([](int n) { return n < 6; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::amb ( AN...  an) const
inline

For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.

There are 2 variants of the operator:

  • The source observable emits nested observables, one of the nested observables is selected.
  • The source observable and the arguments v0...vn are used to provide the observables to select from.
Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.

If scheduler is omitted, identity_current_thread is used.

Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.amb(rxcpp::observe_on_new_thread());
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481294776064] Timer3 fired
[thread 47481294776064] OnNext: 3
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.amb();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto values = o1.amb(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481294776064] Timer3 fired
[thread 47481294776064] OnNext: 3
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::any ( AN &&...  an) const
inline

Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
An observable that emits true if any item emitted by the source observable satisfies a specified condition, otherwise false.

Some basic any- operators have already been implemented:

Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).exists([](int n) { return n > 3; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).contains(3);
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
blocking_observable<T, this_type> rxcpp::observable< T, SourceOperator >::as_blocking ( AN **  ...) const
inline

Return a new observable that contains the blocking methods for this observable.

Returns
An observable that contains the blocking methods for this observable.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){
printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Emit value: 1
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Emit value: 2
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Emit value: 3
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
observable<T> rxcpp::observable< T, SourceOperator >::as_dynamic ( AN **  ...) const
inline

Return a new observable that performs type-forgetting conversion of this observable.

Returns
The source observable converted to observable<T>.
Note
This operator could be useful to workaround lambda deduction bug on msvc 2013.
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::empty<int>();
auto values = o1.concat(o2, o3);
printf("type of o1: %s\n", typeid(o1).name());
printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name());
printf("type of o2: %s\n", typeid(o2).name());
printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name());
printf("type of o3: %s\n", typeid(o3).name());
printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name());
printf("type of values: %s\n", typeid(values).name());
printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name());
type of o1: N5rxcpp10observableIiNS_7sources6detail5rangeIiNS_19identity_one_workerEEEEE
type of o1.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o2: N5rxcpp10observableIiNS_7sources6detail7iterateINSt3__15arrayIiLm1EEENS_19identity_one_workerEEEEE
type of o2.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o3: N5rxcpp10observableIiNS_7sources6detail7iterateINSt3__15arrayIiLm0EEENS_19identity_one_workerEEEEE
type of o3.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of values: N5rxcpp10observableIiNS_9operators6detail6concatINS0_IiNS_18dynamic_observableIiEEEENS0_IS6_NS4_IS6_EEEENS_19identity_one_workerEEEEE
type of values.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::average ( AN **  ...) const
inline

For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.

Returns
An observable that emits a single item: the average of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 4).average();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2.500000
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().average();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: average() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::buffer ( AN &&...  an) const
inline

Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable.

Parameters
countthe maximum size of each buffers before it should be emitted.
skiphow many items need to be skipped before starting a new buffers (optional).
Returns
Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable. If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 5).buffer(2);
values.
[](std::vector<int> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](int a){
printf(" %d", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3 4
OnNext: 5
OnCompleted
Sample Code
auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3);
values.
[](std::vector<int> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](int a){
printf(" %d", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 4 5
OnNext: 7
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::buffer_with_time ( AN &&...  an) const
inline

Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer. If the skip parameter is set, Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.

Template Parameters
Durationthe type of the time interval
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each buffer collects items before it is emitted.
skipthe period of time after which a new buffer will be created (optional).
coordinationthe scheduler for the buffers (optional).
Returns
Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer. If the skip parameter is set, return an Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
map([](long v){
printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v);
return v;
}).
take(7).
values.
[](std::vector<long> v){
printf("[thread %s] OnNext:", get_pid().c_str());
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Interval OnNext: 1
[thread 47481267428736] Interval OnNext: 2
[thread 47481294776064] OnNext: 1 2
[thread 47481267428736] Interval OnNext: 3
[thread 47481267428736] Interval OnNext: 4
[thread 47481267428736] Interval OnNext: 5
[thread 47481294776064] OnNext: 4 5
[thread 47481267428736] Interval OnNext: 6
[thread 47481267428736] Interval OnNext: 7
[thread 47481294776064] OnNext: 7
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 4 5
OnNext: 7
OnCompleted
Overlapping buffers are allowed:
auto period = std::chrono::milliseconds(6);
auto skip = std::chrono::milliseconds(4);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2 3
OnNext: 3 4 5
OnNext: 5 6 7
OnNext: 7
OnCompleted
If no items are emitted, an empty buffer is returned:
auto period = std::chrono::milliseconds(2);
auto skip = std::chrono::milliseconds(4);
auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext:
OnNext:
OnNext: 1
OnCompleted
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3 4
OnNext: 5 6
OnNext: 7
OnCompleted
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
buffer_with_time(std::chrono::milliseconds(4));
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3 4
OnNext: 5 6
OnNext: 7
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::buffer_with_time_or_count ( AN &&...  an) const
inline

Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.

Template Parameters
Durationthe type of the time interval.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each buffer collects items before it is emitted and replaced with a new buffer.
countthe maximum size of each buffer before it is emitted and new buffer is created.
coordinationthe scheduler for the buffers (optional).
Returns
Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
Sample Code
auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
values.
[start](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3
OnNext:
OnNext: 1
OnCompleted
Sample Code
auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
values.
[start](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3
OnNext:
OnNext: 1
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::combine_latest ( AN...  an) const
inline

For each item from all of the observables select a value to emit from the new observable that is returned.

Template Parameters
ANtypes of scheduler (optional), aggregate function (optional), and source observables
Parameters
anscheduler (optional), aggregation function (optional), and source observables
Returns
Observable that emits items that are the result of combining the items emitted by the source observables.

If scheduler is omitted, identity_current_thread is used.

If aggregation function is omitted, the resulting observable returns tuples of emitted items.

Sample Code

Neither scheduler nor aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.combine_latest(o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1, 1, 1
OnNext: 2, 1, 1
OnNext: 2, 2, 1
OnNext: 3, 2, 1
OnNext: 3, 2, 2
OnCompleted

Only scheduler is present:

printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) {
printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto values = o1.combine_latest(thr, o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source1 OnNext: 1
[thread 47481267428736] Source2 OnNext: 1
[thread 47481267428736] Source3 OnNext: 1
[thread 47481298978560] OnNext: 1, 1, 1
[thread 47481267428736] Source1 OnNext: 2
[thread 47481298978560] OnNext: 2, 1, 1
[thread 47481267428736] Source2 OnNext: 2
[thread 47481298978560] OnNext: 2, 2, 1
[thread 47481267428736] Source1 OnNext: 3
[thread 47481298978560] OnNext: 3, 2, 1
[thread 47481267428736] Source3 OnNext: 2
[thread 47481298978560] OnNext: 3, 2, 2
[thread 47481298978560] OnCompleted
[thread 47481267428736] Finish task

Only aggregation function is present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.combine_latest(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 211
OnNext: 221
OnNext: 321
OnNext: 322
OnCompleted

Both scheduler and aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.combine_latest(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 211
OnNext: 221
OnNext: 321
OnNext: 322
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::concat ( AN...  an) const
inline

For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.

There are 2 variants of the operator:

  • The source observable emits nested observables, nested observables are concatenated.
  • The source observable and the arguments v0...vn are used to provide the observables to concatenate.
Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.concat();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.concat(rxcpp::observe_on_new_thread());
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto values = o1.concat(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto values = o1.concat(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::concat_map ( AN &&...  an) const
inline

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts. (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 1 - 3
OnNext: 2 - 1
OnNext: 2 - 2
OnNext: 2 - 3
OnNext: 3 - 1
OnNext: 3 - 2
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %ld)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::concat_transform ( AN &&...  an) const
inline

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts. (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 1 - 3
OnNext: 2 - 1
OnNext: 2 - 2
OnNext: 2 - 3
OnNext: 3 - 1
OnNext: 3 - 2
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %ld)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::contains ( AN &&...  an) const
inline

Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Tthe type of the item to search for.
Parameters
valuethe item to search for.
Returns
An observable that emits true if the source Observable emitted a specified item, otherwise false.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).contains(3);
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::count ( AN **  ...) const
inline

For each item from this observable reduce it by incrementing a count.

Returns
An observable that emits a single item: the number of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::debounce ( AN &&...  an) const
inline

Return an observable that emits an item if a particular timespan has passed without emitting another item from the source observable.

Template Parameters
Durationthe type of the time interval
Coordinationthe type of the scheduler
Parameters
periodthe period of time to suppress any emitted items
coordinationthe scheduler to manage timeout for each event
Returns
Observable that emits an item if a particular timespan has passed without emitting another item from the source observable.
Sample Code
using namespace std::chrono;
auto scheduler = rxcpp::identity_current_thread();
auto start = scheduler.now();
auto period = milliseconds(10);
auto values = rxcpp::observable<>::interval(start, period, scheduler).
take(4).
debounce(period);
values.
[](long v) { printf("OnNext: %ld\n", v); },
[]() { printf("OnCompleted\n"); });
OnNext: 1
OnNext: 2
OnNext: 4
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::default_if_empty ( AN &&...  an) const
inline

If the source Observable terminates without emitting any items, emits a default item and completes.

Template Parameters
Valuethe type of the value to emit.
Parameters
vthe default value to emit.
Returns
Observable that emits the specified default item if the source observable is empty.
Sample Code
auto values = rxcpp::observable<>::empty<int>()
values.subscribe(
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); } );
OnNext: 42
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::delay ( AN &&...  an) const
inline

Return an observable that emits each item emitted by the source observable after the specified delay.

Template Parameters
Durationthe type of time interval
Coordinationthe type of the scheduler
Parameters
periodthe period of time each item is delayed
coordinationthe scheduler for the delays
Returns
Observable that emits each item emitted by the source observable after the specified delay.
Sample Code
using namespace std::chrono;
auto scheduler = rxcpp::identity_current_thread();
auto start = scheduler.now();
auto period = milliseconds(10);
const auto next = [=](const char* s) {
return [=](long v){
auto t = duration_cast<milliseconds>(scheduler.now() - start);
long long int ms = t.count();
printf("[%s @ %lld] OnNext: %ld\n", s, ms, v);
};
};
auto values = rxcpp::observable<>::interval(start, period, scheduler).
take(4).
tap(next("interval")).
values.
next(" delayed"),
[](){printf("OnCompleted\n");});
[interval @ 0] OnNext: 1
[interval @ 10] OnNext: 2
[ delayed @ 10] OnNext: 1
[interval @ 20] OnNext: 3
[ delayed @ 20] OnNext: 2
[interval @ 30] OnNext: 4
[ delayed @ 30] OnNext: 3
[ delayed @ 40] OnNext: 4
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::distinct ( AN &&...  an) const
inline

For each item from this observable, filter out repeated values and emit only items that have not already been emitted.

Returns
Observable that emits those items from the source observable that are distinct.
Note
istinct keeps an unordered_set<T> of past values. Due to an issue in multiple implementations of std::hash<T>, rxcpp maintains a whitelist of hashable types. new types can be added by specializing rxcpp::filtered_hash<T>
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::distinct_until_changed ( AN &&...  an) const
inline

For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.

Template Parameters
BinaryPredicate(optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
Parameters
pred(optional) the function that implements comparison of two values.
Returns
Observable that emits those items from the source observable that are distinct from their immediate predecessors.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct_until_changed();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::element_at ( AN &&...  an) const
inline

Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission.

Parameters
indexthe index of the element to return.
Returns
An observable that emit an item located at a specified index location.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).element_at(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::exists ( AN &&...  an) const
inline

Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
An observable that emits true if any item emitted by the source observable satisfies a specified condition, otherwise false.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).exists([](int n) { return n > 3; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::filter ( AN &&...  an) const
inline

For each item from this observable use Predicate to select which items to emit from the new observable that is returned.

Template Parameters
Predicatethe type of the filter function
Parameters
pthe filter function
Returns
Observable that emits only those items emitted by the source observable that the filter evaluates as true.
Sample Code
auto values = rxcpp::observable<>::range(1, 6).
filter([](int v){
return v % 2;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 3
OnNext: 5
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::finally ( AN &&...  an) const
inline

Add a new action at the end of the new observable that is returned.

Template Parameters
LastCallthe type of the action function
Parameters
lcthe action function
Returns
Observable that emits the same items as the source observable, then invokes the given action.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
finally([](){
printf("The final action\n");
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
The final action
If the source observable generates an error, the final action is still being called:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
finally([](){
printf("The final action\n");
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnError: Error from source
The final action
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::first ( AN **  ...) const
inline

For each item from this observable reduce it by sending only the first item.

Returns
An observable that emits only the very first item emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).first();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::empty<int>().first();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: first() requires a stream with at least one value
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::flat_map ( AN &&...  an) const
inline

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 2 - 1
OnNext: 3 - 1
OnNext: 1 - 2
OnNext: 2 - 2
OnNext: 3 - 2
OnNext: 1 - 3
OnNext: 2 - 3
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, int v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::group_by ( AN &&...  an) const
inline

Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.

Template Parameters
KeySelectorthe type of the key extracting function
MarbleSelectorthe type of the element extracting function
BinaryPredicatethe type of the key comparing function
Parameters
ksa function that extracts the key for each item (optional)
msa function that extracts the return element for each item (optional)
pa function that implements comparison of two keys (optional)
Returns
Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.
Sample Code
bool less(int v1, int v2){
return v1 < v2;
}
auto data = rxcpp::observable<>::range(0, 8).
map([](int v){
std::stringstream s;
s << "Value " << v;
return std::make_pair(v % 3, s.str());
});
auto values = data.group_by(
[](std::pair<int, std::string> v){return v.first;},
[](std::pair<int, std::string> v){return v.second;},
less);
values.
auto key = g.get_key();
printf("OnNext: key = %d\n", key);
[key](const std::string& v){printf("[key %d] OnNext: %s\n", key, v.c_str());},
[key](){printf("[key %d] OnCompleted\n", key);});
},
[](){printf("OnCompleted\n");});
OnNext: key = 0
[key 0] OnNext: Value 0
OnNext: key = 1
[key 1] OnNext: Value 1
OnNext: key = 2
[key 2] OnNext: Value 2
[key 0] OnNext: Value 3
[key 1] OnNext: Value 4
[key 2] OnNext: Value 5
[key 0] OnNext: Value 6
[key 1] OnNext: Value 7
[key 2] OnNext: Value 8
[key 0] OnCompleted
[key 1] OnCompleted
[key 2] OnCompleted
OnCompleted
Sample Code
auto values = rxcpp::observable<>::range(0, 8).
[](int v){return v % 3;},
[](int v){return 10 * v;});
values.
auto key = g.get_key();
printf("OnNext: key = %d\n", key);
[key](int v){printf("[key %d] OnNext: %d\n", key, v);},
[key](){printf("[key %d] OnCompleted\n", key);});
},
[](){printf("OnCompleted\n");});
OnNext: key = 0
[key 0] OnNext: 0
OnNext: key = 1
[key 1] OnNext: 10
OnNext: key = 2
[key 2] OnNext: 20
[key 0] OnNext: 30
[key 1] OnNext: 40
[key 2] OnNext: 50
[key 0] OnNext: 60
[key 1] OnNext: 70
[key 2] OnNext: 80
[key 0] OnCompleted
[key 1] OnCompleted
[key 2] OnCompleted
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::ignore_elements ( AN &&...  an) const
inline

Do not emit any items from the source Observable, but allow termination notification (either onError or onCompleted) to pass through unchanged.

Returns
Observable that emits termination notification from the source observable.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).ignore_elements();
values.
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); });
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::is_empty ( AN &&...  an) const
inline

Returns an Observable that emits true if the source Observable is empty, otherwise false.

Returns
An observable that emits a boolean value.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).is_empty();
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::last ( AN **  ...) const
inline

For each item from this observable reduce it by sending only the last item.

Returns
An observable that emits only the very last item emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).last();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::empty<int>().last();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: last() requires a stream with at least one value
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::map ( AN &&...  an) const
inline

For each item from this observable use Selector to produce an item to emit from the new observable that is returned.

Template Parameters
Selectorthe type of the transforming function
Parameters
sthe selector function
Returns
Observable that emits the items from the source observable, transformed by the specified function.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
return 2 * v;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2
OnNext: 4
OnNext: 6
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::max ( AN **  ...) const
inline

For each item from this observable reduce it by taking the max value of the previous items.

Returns
An observable that emits a single item: the max of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 4).max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4.000000
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: max() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::merge ( AN...  an) const
inline

For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.

There are 2 variants of the operator:

  • The source observable emits nested observables, nested observables are merged.
  • The source observable and the arguments v0...vn are used to provide the observables to merge.
Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits items that are the result of flattening the observables emitted by the source observable.

If scheduler is omitted, identity_current_thread is used.

Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.merge(rxcpp::observe_on_new_thread());
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Timer1 fired
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Timer2 fired
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Timer3 fired
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.merge();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnNext: 2
OnNext: 1
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto values = o1.merge(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnNext: 2
OnNext: 1
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Timer1 fired
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Timer2 fired
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Timer3 fired
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::merge_transform ( AN &&...  an) const
inline

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 2 - 1
OnNext: 3 - 1
OnNext: 1 - 2
OnNext: 2 - 2
OnNext: 3 - 2
OnNext: 1 - 3
OnNext: 2 - 3
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, int v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::min ( AN **  ...) const
inline

For each item from this observable reduce it by taking the min value of the previous items.

Returns
An observable that emits a single item: the min of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 4).min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1.000000
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: min() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::multicast ( AN &&...  an) const
inline

template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::observe_on ( AN &&...  an) const
inline

All values are queued and delivered using the scheduler from the supplied coordination.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cnthe scheduler to notify observers on.
Returns
The source observable modified so that its observers are notified on the specified scheduler.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Emit value 1
[thread 47481267428736] Emit value 2
[thread 47481267428736] Emit value 3
[thread 47481303181056] OnNext: 1
[thread 47481303181056] OnNext: 2
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
[thread 47481267428736] Start task
[thread 47481313687296] Emit value 1
[thread 47481313687296] OnNext: 1
[thread 47481313687296] Emit value 2
[thread 47481313687296] OnNext: 2
[thread 47481313687296] Emit value 3
[thread 47481313687296] OnNext: 3
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::on_error_resume_next ( AN &&...  an) const
inline

If an error occurs, take the result from the Selector and subscribe to that instead.

Template Parameters
Selectorthe actual type of a function of the form observable<T>(std::exception_ptr)
Parameters
sthe function of the form observable<T>(std::exception_ptr)
Returns
Observable that emits the items from the source observable and switches to a new observable on error.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
on_error_resume_next([](std::exception_ptr ep){
printf("Resuming after: %s\n", rxu::what(ep).c_str());
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
Resuming after: Error from source
OnNext: -1
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::pairwise ( AN...  an) const
inline

Take values pairwise from this observable.

Returns
Observable that emits tuples of two the most recent items emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 5).pairwise();
values.
[](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1, 2
OnNext: 2, 3
OnNext: 3, 4
OnNext: 4, 5
OnCompleted
If the source observable emits less than two items, no pairs are emitted by the source observable:
auto values = rxcpp::observable<>::just(1).pairwise();
values.
[](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::publish ( AN &&...  an) const
inline

Turn a cold observable hot and allow connections to the source to be independent of subscriptions. Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.

Template Parameters
Tthe type of the emitted item (optional).
Parameters
firstan initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional).
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[3] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[3] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[3] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
publish(0L);
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 0
[2] OnNext: 0
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[3] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[3] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[3] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[3] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::publish_synchronized ( AN &&...  an) const
inline

Turn a cold observable hot and allow connections to the source to be independent of subscriptions.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cna scheduler all values are queued and delivered on.
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::reduce ( AN &&...  an) const
inline

For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator
Accumulatorthe type of the data accumulating function
ResultSelectorthe type of the result producing function
Parameters
seedthe initial value for the accumulator
aan accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
rsa result producing function that makes the final value from the last accumulator call result
Returns
An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.

Some basic reduce-type operators have already been implemented:

Sample Code
Geometric mean of source values:
auto values = rxcpp::observable<>::range(1, 7).
std::make_pair(0, 1.0),
[](std::pair<int, double> seed, int v){
seed.first += 1;
seed.second *= v;
return seed;
},
[](std::pair<int, double> res){
return std::pow(res.second, 1.0 / res.first);
});
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3.380015
OnCompleted
If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
auto values = rxcpp::observable<>::empty<int>().
1,
[](int,int){return 0;},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
If the accumulator raises an exception, it is returned by the resulting observable in on_error:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){
if (v == 2)
throw std::runtime_error("Exception from accumulator");
return seed;
},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from accumulator
The same for exceptions raised by the result selector:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){return seed + v;},
[](int res){
throw std::runtime_error("Exception from result selector");
return res;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from result selector
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::repeat ( AN...  an) const
inline

Repeat this observable for the given number of times or infinitely.

Template Parameters
Countthe type of the counter (optional).
Parameters
tThe number of times the source observable items are repeated (optional). If not specified, infinitely repeats the source observable. Specifying 0 returns an empty sequence immediately
Returns
An observable that repeats the sequence of items emitted by the source observable for t times.
Sample Code
auto values = rxcpp::observable<>::from(1, 2).repeat(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnCompleted
If the source observable calls on_error, repeat stops:
auto values = rxcpp::observable<>::from(1, 2).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
repeat();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::replay ( AN &&...  an) const
inline

1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.

2) replay(Count, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.

3) replay(Duration, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.

4) replay(Count, Duration, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.

Template Parameters
Durationthe type of the time interval (optional).
Countthe type of the maximum number of the most recent items sent to new observers (optional).
Coordinationthe type of the scheduler (optional).
Parameters
countthe maximum number of the most recent items sent to new observers (optional).
dthe duration of the window in which the replayed items must be emitted
cna scheduler all values are queued and delivered on (optional).
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay();
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[2] OnNext: 1
[2] OnNext: 2
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056][1] OnNext: 1
[thread 47481303181056][1] OnNext: 2
[thread 47481303181056][1] OnNext: 3
[thread 47481303181056][2] OnNext: 1
[thread 47481303181056][2] OnNext: 2
[thread 47481303181056][2] OnNext: 3
[thread 47481303181056][1] OnNext: 4
[thread 47481303181056][2] OnNext: 4
[thread 47481303181056][1] OnNext: 5
[thread 47481303181056][2] OnNext: 5
[thread 47481303181056][1] OnCompleted
[thread 47481303181056][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(2);
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[2] OnNext: 2
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(2, coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481305282304][1] OnNext: 1
[thread 47481305282304][1] OnNext: 2
[thread 47481305282304][1] OnNext: 3
[thread 47481305282304][2] OnNext: 2
[thread 47481305282304][2] OnNext: 3
[thread 47481305282304][1] OnNext: 4
[thread 47481305282304][2] OnNext: 4
[thread 47481305282304][1] OnNext: 5
[thread 47481305282304][2] OnNext: 5
[thread 47481305282304][1] OnCompleted
[thread 47481305282304][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(std::chrono::milliseconds(125));
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[1] OnNext: 4
[2] OnNext: 2
[2] OnNext: 3
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(std::chrono::milliseconds(125), coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481307383552][1] OnNext: 1
[thread 47481307383552][1] OnNext: 2
[thread 47481307383552][1] OnNext: 3
[thread 47481307383552][1] OnNext: 4
[thread 47481307383552][2] OnNext: 2
[thread 47481307383552][2] OnNext: 3
[thread 47481307383552][2] OnNext: 4
[thread 47481307383552][1] OnNext: 5
[thread 47481307383552][2] OnNext: 5
[thread 47481307383552][1] OnCompleted
[thread 47481307383552][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(2, std::chrono::milliseconds(125));
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[1] OnNext: 4
[2] OnNext: 3
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(2, std::chrono::milliseconds(125), coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481309484800][1] OnNext: 1
[thread 47481309484800][1] OnNext: 2
[thread 47481309484800][1] OnNext: 3
[thread 47481309484800][1] OnNext: 4
[thread 47481309484800][2] OnNext: 3
[thread 47481309484800][2] OnNext: 4
[thread 47481309484800][1] OnNext: 5
[thread 47481309484800][2] OnNext: 5
[thread 47481309484800][1] OnCompleted
[thread 47481309484800][2] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::retry ( AN...  an) const
inline

Retry this observable for the given number of times.

Template Parameters
Countthe type of the counter (optional)
Parameters
tthe total number of tries (optional), i.e. retry(2) means one normal try, before an error occurs, and one retry. If not specified, infinitely retries the source observable. Specifying 0 returns immediately without subscribing
Returns
An observable that mirrors the source observable, resubscribing to it if it calls on_error up to a specified number of retries.
Sample Code
auto source = rxcpp::observable<>::from(1, 2).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
auto values = source.retry(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::sample_with_time ( AN &&...  an) const
inline

Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals.

Template Parameters
Durationthe type of time interval.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time to sample the source observable.
coordinationthe scheduler for the items (optional).
Returns
Observable that emits the most recently emitted item since the previous sampling.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).
take(7).
sample_with_time(std::chrono::milliseconds(4));
values.
[](long v) {
printf("OnNext: %ld\n", v);
},
[]() { printf("OnCompleted\n"); });
OnNext: 1
OnNext: 3
OnNext: 5
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::scan ( AN...  an) const
inline

For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator.
Accumulatorthe type of the data accumulating function.
Parameters
seedthe initial value for the accumulator.
aan accumulator function to be invoked on each item emitted by the source observable, whose result will be emitted and used in the next accumulator call.
Returns
An observable that emits the results of each call to the accumulator function.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).
0,
[](int seed, int v){
return seed + v;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 3
OnNext: 6
OnNext: 10
OnNext: 15
OnNext: 21
OnNext: 28
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::sequence_equal ( AN...  an) const
inline

Determine whether two Observables emit the same sequence of items.

Template Parameters
OtherSourcethe type of the other observable.
BinaryPredicatethe type of the value comparing function (optional). The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
Coordinationthe type of the scheduler (optional).
Parameters
tthe other Observable that emits items to compare.
predthe function that implements comparison of two values (optional).
cnthe scheduler (optional).
Returns
Observable that emits true only if both sequences terminate normally after emitting the same sequence of items in the same order; otherwise it will emit false.
Sample Code
auto source = rxcpp::observable<>::range(1, 3);
auto values = source.sequence_equal(rxcpp::observable<>::range(1, 3));
values.
[](bool v){ printf("OnNext: %s\n", v ? "true" : "false"); },
[](){ printf("OnCompleted\n");} );
OnNext: true
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip ( AN...  an) const
inline

Make new observable with skipped first count items from this observable.

Template Parameters
Countthe type of the items counter
Parameters
tthe number of items to skip
Returns
An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).skip(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip_last ( AN...  an) const
inline

Make new observable with skipped last count items from this observable.

Template Parameters
Countthe type of the items counter.
Parameters
tthe number of last items to skip.
Returns
An observable that is identical to the source observable except that it does not emit the last t items that the source observable emits.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).skip_last(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip_until ( AN...  an) const
inline

Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time. skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)

Template Parameters
TriggerSourcethe type of the trigger observable.
Coordinationthe type of the scheduler (optional).
Parameters
tan observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable.
cnthe scheduler to use for scheduling the items (optional).
Returns
An observable that skips items from the source observable until the second observable emits an item or the time runs out, then emits the remaining items.
Sample Code
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
auto values = source.skip_until(trigger);
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto values = source.skip_until(trigger, rxcpp::observe_on_new_thread());
values.
[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source emits, value = 1
[thread 47481267428736] Source emits, value = 2
[thread 47481267428736] Source emits, value = 3
[thread 47481267428736] Trigger emits, value = 1
[thread 47481267428736] Source emits, value = 4
[thread 47481311586048] OnNext: 4
[thread 47481267428736] Source emits, value = 5
[thread 47481311586048] OnNext: 5
[thread 47481267428736] Source emits, value = 6
[thread 47481311586048] OnNext: 6
[thread 47481267428736] Source emits, value = 7
[thread 47481311586048] OnNext: 7
[thread 47481311586048] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::start_with ( AN...  an) const
inline

Start with the supplied values, then concatenate this observable.

Template Parameters
Value0...
ValueNthe type of sending values
Parameters
v0...
vnvalues to send
Returns
Observable that emits the specified items and then emits the items emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(10, 12).
start_with(1, 2, 3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 10
OnNext: 11
OnNext: 12
OnCompleted
Another form of this operator, rxcpp::observable<void, void>::start_with, gets the source observable as a parameter:
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 10
OnNext: 11
OnNext: 12
OnCompleted
template<class T, class SourceOperator>
template<class... ArgN>
auto rxcpp::observable< T, SourceOperator >::subscribe ( ArgN &&...  an) const -> composite_subscription
inline

Subscribe will cause the source observable to emit values to the provided subscriber.

Template Parameters
ArgNtypes of the subscriber parameters
Parameters
anthe parameters for making a subscriber
Returns
A subscription with which the observer can stop receiving items before the observable has finished sending them.

The arguments of subscribe are forwarded to rxcpp::make_subscriber function. Some possible alternatives are:

  • Pass an already composed rxcpp::subscriber:
    auto subscriber = rxcpp::make_subscriber<int>(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});
    auto values = rxcpp::observable<>::range(1, 3);
    values.subscribe(subscriber);
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnCompleted
  • Pass an rxcpp::observer. This allows subscribing the same subscriber to several observables:
    auto subscriber = rxcpp::make_subscriber<int>(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});
    auto values1 = rxcpp::observable<>::range(1, 3);
    auto values2 = rxcpp::observable<>::range(4, 6);
    values1.subscribe(subscriber.get_observer());
    values2.subscribe(subscriber.get_observer());
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnCompleted
    OnNext: 4
    OnNext: 5
    OnNext: 6
    OnCompleted
  • Pass an on_next handler:
    auto values = rxcpp::observable<>::range(1, 3);
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);});
    OnNext: 1
    OnNext: 2
    OnNext: 3
  • Pass on_next and on_error handlers:
    auto values = rxcpp::observable<>::range(1, 3).
    concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](std::exception_ptr ep){
    try {std::rethrow_exception(ep);}
    catch (const std::exception& ex) {
    printf("OnError: %s\n", ex.what());
    }
    });
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnError: Error from source
  • Pass on_next and on_completed handlers:
    auto values = rxcpp::observable<>::range(1, 3);
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnCompleted
  • Pass on_next, on_error, and on_completed handlers:
    auto values = rxcpp::observable<>::range(1, 3).
    concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](std::exception_ptr ep){
    try {std::rethrow_exception(ep);}
    catch (const std::exception& ex) {
    printf("OnError: %s\n", ex.what());
    }
    },
    [](){printf("OnCompleted\n");});
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnError: Error from source

All the alternatives above also support passing rxcpp::composite_subscription instance. For example:

auto subscription = rxcpp::composite_subscription();
auto values = rxcpp::observable<>::range(1, 5);
values.subscribe(
subscription,
[&subscription](int v){
printf("OnNext: %d\n", v);
if (v == 3)
subscription.unsubscribe();
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3

If neither subscription nor subscriber are provided, then a new subscription is created and returned as a result:

auto values = rxcpp::observable<>::range(1, 3).
finally([](){printf("The final action\n");});
auto subscription = values.subscribe(
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
subscription.unsubscribe();
OnNext: 1
OnNext: 2
OnNext: 3
The final action

For more details, see rxcpp::make_subscriber function description.

template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::subscribe_on ( AN &&...  an) const
inline

Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cnthe scheduler to perform subscription actions on.
Returns
The source observable modified so that its subscriptions happen on the specified scheduler.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481313687296] Emit value 1
[thread 47481313687296] OnNext: 1
[thread 47481313687296] Emit value 2
[thread 47481313687296] OnNext: 2
[thread 47481313687296] Emit value 3
[thread 47481313687296] OnNext: 3
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
[thread 47481267428736] Start task
[thread 47481267428736] Emit value 1
[thread 47481267428736] Emit value 2
[thread 47481267428736] Emit value 3
[thread 47481303181056] OnNext: 1
[thread 47481303181056] OnNext: 2
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::sum ( AN **  ...) const
inline

For each item from this observable reduce it by adding to the previous items.

Returns
An observable that emits a single item: the sum of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 6
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: sum() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::switch_if_empty ( AN &&...  an) const
inline

If the source Observable terminates without emitting any items, emits items from a backup Observable.

Template Parameters
BackupSourcethe type of the backup observable.
Parameters
ta backup observable that is used if the source observable is empty.
Returns
Observable that emits items from a backup observable if the source observable is empty.
Sample Code
auto values = rxcpp::observable<>::empty<int>()
values.subscribe(
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); } );
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::switch_on_error ( AN &&...  an) const
inline

If an error occurs, take the result from the Selector and subscribe to that instead.

Template Parameters
Selectorthe actual type of a function of the form observable<T>(std::exception_ptr)
Parameters
sthe function of the form observable<T>(std::exception_ptr)
Returns
Observable that emits the items from the source observable and switches to a new observable on error.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
on_error_resume_next([](std::exception_ptr ep){
printf("Resuming after: %s\n", rxu::what(ep).c_str());
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
Resuming after: Error from source
OnNext: -1
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::switch_on_next ( AN &&...  an) const
inline

Return observable that emits the items emitted by the observable most recently emitted by the source observable.

Template Parameters
Coordinationthe type of the scheduler (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
Returns
Observable that emits the items emitted by the observable most recently emitted by the source observable.
Sample Code
auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)).
take(3).
map([](long){
return rxcpp::observable<>::interval(std::chrono::milliseconds(10)).as_dynamic();
});
auto values = base.switch_on_next().take(10);
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take ( AN...  an) const
inline

For the first count items from this observable emit them from the new observable that is returned.

Template Parameters
Countthe type of the items counter.
Parameters
tthe number of items to take.
Returns
An observable that emits only the first t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).take(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take_last ( AN &&...  an) const
inline

Emit only the final t items emitted by the source Observable.

Template Parameters
Countthe type of the items counter.
Parameters
tthe number of last items to take.
Returns
An observable that emits only the last t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).take_last(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take_until ( AN &&...  an) const
inline

For each item from this observable until on_next occurs on the trigger observable or until the specified time, emit them from the new observable that is returned. take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)

Template Parameters
TriggerSourcethe type of the trigger observable.
TimePointthe type of the time interval.
Coordinationthe type of the scheduler (optional).
Parameters
tan observable whose first emitted item will stop emitting items from the source observable.
whena time point when the returned observable will stop emitting items.
cnthe scheduler to use for scheduling the items (optional).
Returns
An observable that emits the items emitted by the source observable until trigger observable emitted or the time runs out.
Sample Code
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
auto values = source.take_until(trigger);
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto values = source.take_until(trigger, rxcpp::observe_on_new_thread());
values.
[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source emits, value = 1
[thread 47481313687296] OnNext: 1
[thread 47481267428736] Source emits, value = 2
[thread 47481313687296] OnNext: 2
[thread 47481267428736] Source emits, value = 3
[thread 47481313687296] OnNext: 3
[thread 47481267428736] Trigger emits, value = 1
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25));
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto scheduler = rxcpp::observe_on_new_thread();
auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler);
values.
[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source emits, value = 1
[thread 47481313687296] OnNext: 1
[thread 47481267428736] Source emits, value = 2
[thread 47481313687296] OnNext: 2
[thread 47481267428736] Source emits, value = 3
[thread 47481313687296] OnNext: 3
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take_while ( AN &&...  an) const
inline

For the first items fulfilling the predicate from this observable emit them from the new observable that is returned.

Template Parameters
Predicatethe type of the predicate
Parameters
tthe predicate
Returns
An observable that emits only the first items emitted by the source Observable fulfilling the predicate, or all of the items from the source observable if the predicate never returns false
Sample Code
auto values = rxcpp::observable<>::range(1, 8).
take_while([](int v){
return v <= 4;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::tap ( AN &&...  an) const
inline

inspect calls to on_next, on_error and on_completed.

Template Parameters
MakeObserverArgN...these args are passed to make_observer.
Parameters
anthese args are passed to make_observer.
Returns
Observable that emits the same items as the source observable to both the subscriber and the observer.
Note
If an on_error method is not supplied the observer will ignore errors rather than call std::terminate()
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
tap(
[](int v){printf("Tap - OnNext: %d\n", v);},
[](){printf("Tap - OnCompleted\n");});
values.
[](int v){printf("Subscribe - OnNext: %d\n", v);},
[](){printf("Subscribe - OnCompleted\n");});
Tap - OnNext: 1
Subscribe - OnNext: 1
Tap - OnNext: 2
Subscribe - OnNext: 2
Tap - OnNext: 3
Subscribe - OnNext: 3
Tap - OnCompleted
Subscribe - OnCompleted
If the source observable generates an error, the observer passed to tap is called:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
tap(
[](int v){printf("Tap - OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("Tap - OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("Tap - OnCompleted\n");});
values.
[](int v){printf("Subscribe - OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("Subscribe - OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("Subscribe - OnCompleted\n");});
Tap - OnNext: 1
Subscribe - OnNext: 1
Tap - OnNext: 2
Subscribe - OnNext: 2
Tap - OnNext: 3
Subscribe - OnNext: 3
Tap - OnError: Error from source
Subscribe - OnError: Error from source
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::time_interval ( AN &&...  an) const
inline

Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable. The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
coordinationthe scheduler for time intervals.
Returns
Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
Sample Code
typedef rxcpp::schedulers::scheduler::clock_type::time_point::duration duration_type;
using namespace std::chrono;
auto values = rxcpp::observable<>::interval(milliseconds(100))
.time_interval()
.take(3);
values.
[&](duration_type v) {
long long int ms = duration_cast<milliseconds>(v).count();
printf("OnNext: @%lldms\n", ms);
},
[](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[]() { printf("OnCompleted\n"); });
OnNext: @0ms
OnNext: @100ms
OnNext: @100ms
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::timeout ( AN &&...  an) const
inline

Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.

Template Parameters
Durationthe type of time interval.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time wait for another item from the source observable.
coordinationthe scheduler to manage timeout for each event (optional).
Returns
Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable.
Sample Code
using namespace std::chrono;
auto values = rxcpp::observable<>::interval(milliseconds(100))
.take(3)
.concat(rxcpp::observable<>::interval(milliseconds(500)))
.timeout(milliseconds(200));
values.
[](long v) { printf("OnNext: %ld\n", v); },
[](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const rxcpp::timeout_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[]() { printf("OnCompleted\n"); });
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 1
OnError: timeout has occurred
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::timestamp ( AN &&...  an) const
inline

Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.

Template Parameters
Coordinationthe type of the scheduler (optional).
Parameters
coordinationthe scheduler to manage timeout for each event (optional).
Returns
Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
Sample Code
typedef rxcpp::schedulers::scheduler::clock_type::time_point time_point;
using namespace std::chrono;
auto values = rxcpp::observable<>::interval(milliseconds(100))
.timestamp()
.take(3);
time_point start = rxcpp::identity_current_thread().now();
values.
[&](std::pair<long, time_point> v) {
long long int ms = duration_cast<milliseconds>(v.second - start).count();
printf("OnNext: %ld @%lldms\n", v.first, ms);
},
[](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[]() { printf("OnCompleted\n"); });
OnNext: 1 @0ms
OnNext: 2 @100ms
OnNext: 3 @200ms
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::transform ( AN &&...  an) const
inline

For each item from this observable use Selector to produce an item to emit from the new observable that is returned.

Template Parameters
Selectorthe type of the transforming function
Parameters
sthe selector function
Returns
Observable that emits the items from the source observable, transformed by the specified function.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
return 2 * v;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2
OnNext: 4
OnNext: 6
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window ( AN &&...  an) const
inline

Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable.

Parameters
countthe maximum size of each window before it should be completed
skiphow many items need to be skipped before starting a new window
Returns
Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an Observable that emits windows every skip items containing at most count items from the source observable.
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::range(1, 7).window(2, 3);
values.
[&counter](rxcpp::observable<int> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](int v){printf("[window %d] OnNext: %d\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 4
[window 1] OnNext: 5
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 7
[window 2] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::range(1, 5).window(2);
values.
[&counter](rxcpp::observable<int> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](int v){printf("[window %d] OnNext: %d\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 5
[window 2] OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window_toggle ( AN &&...  an) const
inline

Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.

Template Parameters
Openingsobservable<OT>
ClosingSelectora function of type observable<CT>(OT)
Coordinationthe type of the scheduler (optional).
Parameters
openseach value from this observable opens a new window.
closesthis function is called for each opened window and returns an observable. the first value from the returned observable will close the window.
coordinationthe scheduler for the windows (optional).
Returns
Observable that emits an observable for each opened window.
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
rxcpp::observable<>::interval(std::chrono::milliseconds(4)),
[](long){
return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1);
},
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
rxcpp::observable<>::interval(std::chrono::milliseconds(4)),
[](long){
return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1);
});
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window_with_time ( AN &&...  an) const
inline

Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.

Template Parameters
Durationthe type of time intervals.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each window collects items before it is completed.
skipthe period of time after which a new window will be created.
coordinationthe scheduler for the windows (optional).
Returns
Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable. If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
Sample Code
int counter = 0;
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 4
[window 1] OnNext: 5
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 7
[window 2] OnCompleted
Sample Code
int counter = 0;
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 4
[window 1] OnNext: 5
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 7
[window 2] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
window_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
window_with_time(std::chrono::milliseconds(4));
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window_with_time_or_count ( AN &&...  an) const
inline

Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.

Template Parameters
Durationthe type of time intervals.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each window collects items before it is completed and replaced with a new window.
countthe maximum size of each window before it is completed and new window is created.
coordinationthe scheduler for the windows (optional).
Returns
Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
Sample Code
int counter = 0;
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
window_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 3
[window 1] OnCompleted
[window 2] Create window
[window 2] OnCompleted
[window 3] Create window
[window 3] OnNext: 1
[window 3] OnCompleted
Sample Code
int counter = 0;
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
window_with_time_or_count(std::chrono::milliseconds(20), 2);
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 3
[window 1] OnCompleted
[window 2] Create window
[window 2] OnCompleted
[window 3] Create window
[window 3] OnNext: 1
[window 3] OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::with_latest_from ( AN...  an) const
inline

For each item from the first observable select the latest value from all the observables to emit from the new observable that is returned.

Template Parameters
ANtypes of scheduler (optional), aggregate function (optional), and source observables
Parameters
anscheduler (optional), aggregation function (optional), and source observables
Returns
Observable that emits items that are the result of combining the items emitted by the source observables.

If scheduler is omitted, identity_current_thread is used.

If aggregation function is omitted, the resulting observable returns tuples of emitted items.

Sample Code

Neither scheduler nor aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.with_latest_from(o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("OnCompleted\n");});
OnNext: 2, 1, 1
OnNext: 3, 2, 1
OnNext: 4, 2, 2
OnNext: 5, 3, 2
OnNext: 6, 4, 2
OnCompleted

Only scheduler is present:

printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) {
printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto values = o1.with_latest_from(thr, o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source1 OnNext: 1
[thread 47481267428736] Source2 OnNext: 1
[thread 47481267428736] Source3 OnNext: 1
[thread 47481267428736] Source1 OnNext: 2
[thread 47481294776064] OnNext: 2, 1, 1
[thread 47481267428736] Source2 OnNext: 2
[thread 47481267428736] Source1 OnNext: 3
[thread 47481294776064] OnNext: 3, 2, 1
[thread 47481267428736] Source3 OnNext: 2
[thread 47481267428736] Source1 OnNext: 4
[thread 47481267428736] Source2 OnNext: 3
[thread 47481294776064] OnNext: 4, 2, 2
[thread 47481267428736] Source1 OnNext: 5
[thread 47481294776064] OnNext: 5, 3, 2
[thread 47481267428736] Source2 OnNext: 4
[thread 47481267428736] Source1 OnNext: 6
[thread 47481267428736] Source3 OnNext: 3
[thread 47481294776064] OnNext: 6, 4, 2
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task

Only aggregation function is present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.with_latest_from(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 211
OnNext: 321
OnNext: 422
OnNext: 532
OnNext: 642
OnCompleted

Both scheduler and aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.with_latest_from(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 211
OnNext: 321
OnNext: 422
OnNext: 532
OnNext: 642
OnCompleted
template<class T, class SourceOperator>
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::zip ( AN &&...  an) const
inline

Bring by one item from all given observables and select a value to emit from the new observable that is returned.

Template Parameters
ANtypes of scheduler (optional), aggregate function (optional), and source observables
Parameters
anscheduler (optional), aggregation function (optional), and source observables
Returns
Observable that emits the result of combining the items emitted and brought by one from each of the source observables.

If scheduler is omitted, identity_current_thread is used.

If aggregation function is omitted, the resulting observable returns tuples of emitted items.

Sample Code

Neither scheduler nor aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto values = o1.zip(o2, o3);
values.
take(3).
[](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1, 1, 1
OnNext: 2, 2, 2
OnNext: 3, 3, 3
OnCompleted

Only scheduler is present:

printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) {
printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto values = o1.zip(thr, o2, o3);
values.
take(3).
[](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source1 OnNext: 1
[thread 47481267428736] Source2 OnNext: 1
[thread 47481267428736] Source3 OnNext: 1
[thread 47481298978560] OnNext: 1, 1, 1
[thread 47481267428736] Source1 OnNext: 2
[thread 47481267428736] Source1 OnNext: 3
[thread 47481267428736] Source2 OnNext: 2
[thread 47481267428736] Source1 OnNext: 4
[thread 47481267428736] Source3 OnNext: 2
[thread 47481298978560] OnNext: 2, 2, 2
[thread 47481267428736] Source1 OnNext: 5
[thread 47481267428736] Source2 OnNext: 3
[thread 47481267428736] Source1 OnNext: 6
[thread 47481267428736] Source1 OnNext: 7
[thread 47481267428736] Source2 OnNext: 4
[thread 47481267428736] Source3 OnNext: 3
[thread 47481298978560] OnNext: 3, 3, 3
[thread 47481298978560] OnCompleted
[thread 47481267428736] Finish task

Only aggregation function is present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto values = o1 | rxcpp::operators::zip(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(3).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 222
OnNext: 333
OnCompleted

Both scheduler and aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto values = o1.zip(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(3).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 222
OnNext: 333
OnCompleted

Friends And Related Function Documentation

template<class T, class SourceOperator>
template<class U , class SO >
friend class observable
friend
template<class T, class SourceOperator>
template<class U , class SO >
bool operator== ( const observable< U, SO > &  ,
const observable< U, SO > &   
)
friend

Member Data Documentation

template<class T, class SourceOperator>
source_operator_type rxcpp::observable< T, SourceOperator >::source_operator
mutable

The documentation for this class was generated from the following file: