RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Classes | Functions | Variables
rxcpp::operators Namespace Reference

Classes

struct  is_operator
 
struct  operator_base
 
struct  tag_operator
 

Functions

template<class... AN>
auto all (AN &&...an) -> operator_factory< all_tag, AN... >
 Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. Emits true if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto is_empty (AN &&...an) -> operator_factory< is_empty_tag, AN... >
 Returns an Observable that emits true if the source Observable is empty, otherwise false. More...
 
template<class... AN>
auto amb (AN &&...an) -> operator_factory< amb_tag, AN... >
 For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. More...
 
template<class... AN>
auto any (AN &&...an) -> operator_factory< any_tag, AN... >
 Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto exists (AN &&...an) -> operator_factory< exists_tag, AN... >
 Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto contains (AN &&...an) -> operator_factory< contains_tag, AN... >
 Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto buffer (AN &&...an) -> operator_factory< buffer_count_tag, AN... >
 Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable. More...
 
template<class... AN>
auto buffer_with_time (AN &&...an) -> operator_factory< buffer_with_time_tag, AN... >
 Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer. If the skip parameter is set, Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler. More...
 
template<class... AN>
auto buffer_with_time_or_count (AN &&...an) -> operator_factory< buffer_with_time_or_count_tag, AN... >
 Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler. More...
 
template<class... AN>
auto combine_latest (AN &&...an) -> operator_factory< combine_latest_tag, AN... >
 For each item from all of the observables select a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto concat (AN &&...an) -> operator_factory< concat_tag, AN... >
 For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned. More...
 
template<class... AN>
auto concat_map (AN &&...an) -> operator_factory< concat_map_tag, AN... >
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto concat_transform (AN &&...an) -> operator_factory< concat_map_tag, AN... >
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto connect_forever (AN &&...an) -> operator_factory< connect_forever_tag, AN... >
 takes a connectable_observable source and calls connect during the construction of the expression. This means that the source starts running without any subscribers and continues running after all subscriptions have been unsubscribed. More...
 
template<class... AN>
auto debounce (AN &&...an) -> operator_factory< debounce_tag, AN... >
 Return an observable that emits an item if a particular timespan has passed without emitting another item from the source observable. More...
 
template<class... AN>
auto delay (AN &&...an) -> operator_factory< delay_tag, AN... >
 Return an observable that emits each item emitted by the source observable after the specified delay. More...
 
template<class... AN>
auto distinct (AN &&...an) -> operator_factory< distinct_tag, AN... >
 For each item from this observable, filter out repeated values and emit only items that have not already been emitted. More...
 
template<class... AN>
auto distinct_until_changed (AN &&...an) -> operator_factory< distinct_until_changed_tag, AN... >
 For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned. More...
 
template<class... AN>
auto element_at (AN &&...an) -> operator_factory< element_at_tag, AN... >
 Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission. More...
 
template<class... AN>
auto filter (AN &&...an) -> operator_factory< filter_tag, AN... >
 For each item from this observable use Predicate to select which items to emit from the new observable that is returned. More...
 
template<class... AN>
auto finally (AN &&...an) -> operator_factory< final ly_tag
 Add a new action at the end of the new observable that is returned. More...
 
template<class... AN>
auto flat_map (AN &&...an) -> operator_factory< flat_map_tag, AN... >
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto merge_transform (AN &&...an) -> operator_factory< flat_map_tag, AN... >
 For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. More...
 
template<class... AN>
auto group_by (AN &&...an) -> operator_factory< group_by_tag, AN... >
 Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. More...
 
template<class... AN>
auto ignore_elements (AN &&...an) -> operator_factory< ignore_elements_tag, AN... >
 Do not emit any items from the source Observable, but allow termination notification (either onError or onCompleted) to pass through unchanged. More...
 
template<class ResultType , class Operator >
auto lift (Operator &&op) -> detail::lift_factory< ResultType, Operator >
 
template<class... AN>
auto map (AN &&...an) -> operator_factory< map_tag, AN... >
 For each item from this observable use Selector to produce an item to emit from the new observable that is returned. More...
 
template<class... AN>
auto transform (AN &&...an) -> operator_factory< map_tag, AN... >
 For each item from this observable use Selector to produce an item to emit from the new observable that is returned. More...
 
template<class... AN>
auto merge (AN &&...an) -> operator_factory< merge_tag, AN... >
 For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. More...
 
template<class... AN>
auto multicast (AN &&...an) -> operator_factory< multicast_tag, AN... >
 allows connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto observe_on (AN &&...an) -> operator_factory< observe_on_tag, AN... >
 All values are queued and delivered using the scheduler from the supplied coordination. More...
 
template<class... AN>
auto on_error_resume_next (AN &&...an) -> operator_factory< on_error_resume_next_tag, AN... >
 If an error occurs, take the result from the Selector and subscribe to that instead. More...
 
template<class... AN>
auto switch_on_error (AN &&...an) -> operator_factory< on_error_resume_next_tag, AN... >
 If an error occurs, take the result from the Selector and subscribe to that instead. More...
 
template<class... AN>
auto pairwise (AN &&...an) -> operator_factory< pairwise_tag, AN... >
 Take values pairwise from this observable. More...
 
template<class... AN>
auto publish (AN &&...an) -> operator_factory< publish_tag, AN... >
 Turn a cold observable hot and allow connections to the source to be independent of subscriptions. Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto publish_synchronized (AN &&...an) -> operator_factory< publish_synchronized_tag, AN... >
 Turn a cold observable hot and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto reduce (AN &&...an) -> operator_factory< reduce_tag, AN... >
 For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...
 
template<class... AN>
auto accumulate (AN &&...an) -> operator_factory< reduce_tag, AN... >
 For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...
 
auto first () -> operator_factory< first_tag >
 For each item from this observable reduce it by sending only the first item. More...
 
auto last () -> operator_factory< last_tag >
 For each item from this observable reduce it by sending only the last item. More...
 
auto count () -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
 For each item from this observable reduce it by incrementing a count. More...
 
auto average () -> operator_factory< average_tag >
 For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. More...
 
auto sum () -> operator_factory< sum_tag >
 For each item from this observable reduce it by adding to the previous items. More...
 
auto min () -> operator_factory< min_tag >
 For each item from this observable reduce it by taking the min value of the previous items. More...
 
auto max () -> operator_factory< max_tag >
 For each item from this observable reduce it by taking the max value of the previous items. More...
 
template<class... AN>
auto ref_count (AN &&...an) -> operator_factory< ref_count_tag, AN... >
 takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source. The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection. More...
 
template<class... AN>
auto repeat (AN &&...an) -> operator_factory< repeat_tag, AN... >
 Repeat this observable for the given number of times or infinitely. More...
 
template<class... AN>
auto replay (AN &&...an) -> operator_factory< replay_tag, AN... >
 1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto retry (AN &&...an) -> operator_factory< retry_tag, AN... >
 Retry this observable for the given number of times. More...
 
template<class... AN>
auto sample_with_time (AN &&...an) -> operator_factory< sample_with_time_tag, AN... >
 Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals. More...
 
template<class... AN>
auto scan (AN &&...an) -> operator_factory< scan_tag, AN... >
 For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned. More...
 
template<class... AN>
auto sequence_equal (AN &&...an) -> operator_factory< sequence_equal_tag, AN... >
 Determine whether two Observables emit the same sequence of items. More...
 
template<class... AN>
auto skip (AN &&...an) -> operator_factory< skip_tag, AN... >
 Make new observable with skipped first count items from this observable. More...
 
template<class... AN>
auto skip_last (AN &&...an) -> operator_factory< skip_last_tag, AN... >
 Make new observable with skipped last count items from this observable. More...
 
template<class... AN>
auto skip_until (AN &&...an) -> operator_factory< skip_until_tag, AN... >
 Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time. skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination) More...
 
template<class... AN>
auto start_with (AN &&...an) -> operator_factory< start_with_tag, AN... >
 Start with the supplied values, then concatenate this observable. More...
 
template<class T , class... ArgN>
auto subscribe (ArgN &&...an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
 Subscribe will cause the source observable to emit values to the provided subscriber. More...
 
auto as_dynamic () -> detail::dynamic_factory
 
auto as_blocking () -> detail::blocking_factory
 
template<class... AN>
auto subscribe_on (AN &&...an) -> operator_factory< subscribe_on_tag, AN... >
 Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination. More...
 
template<class... AN>
auto switch_if_empty (AN &&...an) -> operator_factory< switch_if_empty_tag, AN... >
 If the source Observable terminates without emitting any items, emits items from a backup Observable. More...
 
template<class... AN>
auto default_if_empty (AN &&...an) -> operator_factory< default_if_empty_tag, AN... >
 If the source Observable terminates without emitting any items, emits a default item and completes. More...
 
template<class... AN>
auto switch_on_next (AN &&...an) -> operator_factory< switch_on_next_tag, AN... >
 Return observable that emits the items emitted by the observable most recently emitted by the source observable. More...
 
template<class... AN>
auto take (AN &&...an) -> operator_factory< take_tag, AN... >
 For the first count items from this observable emit them from the new observable that is returned. More...
 
template<class... AN>
auto take_last (AN &&...an) -> operator_factory< take_last_tag, AN... >
 Emit only the final t items emitted by the source Observable. More...
 
template<class... AN>
auto take_until (AN &&...an) -> operator_factory< take_until_tag, AN... >
 For each item from this observable until on_next occurs on the trigger observable or until the specified time, emit them from the new observable that is returned. take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination) More...
 
template<class... AN>
auto take_while (AN &&...an) -> operator_factory< take_while_tag, AN... >
 For the first items fulfilling the predicate from this observable emit them from the new observable that is returned. More...
 
template<class... AN>
auto tap (AN &&...an) -> operator_factory< tap_tag, AN... >
 inspect calls to on_next, on_error and on_completed. More...
 
template<class... AN>
auto time_interval (AN &&...an) -> operator_factory< time_interval_tag, AN... >
 Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable. The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item. More...
 
template<class... AN>
auto timeout (AN &&...an) -> operator_factory< timeout_tag, AN... >
 Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable. More...
 
template<class... AN>
auto timestamp (AN &&...an) -> operator_factory< timestamp_tag, AN... >
 Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. More...
 
template<class... AN>
auto window (AN &&...an) -> operator_factory< window_tag, AN... >
 Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable. More...
 
template<class... AN>
auto window_with_time (AN &&...an) -> operator_factory< window_with_time_tag, AN... >
 Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. More...
 
template<class... AN>
auto window_with_time_or_count (AN &&...an) -> operator_factory< window_with_time_or_count_tag, AN... >
 Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler. More...
 
template<class... AN>
auto window_toggle (AN &&...an) -> operator_factory< window_toggle_tag, AN... >
 Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. More...
 
template<class... AN>
auto with_latest_from (AN &&...an) -> operator_factory< with_latest_from_tag, AN... >
 For each item from the first observable select the latest value from all the observables to emit from the new observable that is returned. More...
 
template<class... AN>
auto zip (AN &&...an) -> operator_factory< zip_tag, AN... >
 Bring by one item from all given observables and select a value to emit from the new observable that is returned. More...
 

Variables

auto AN
 

Function Documentation

template<class... AN>
auto rxcpp::operators::accumulate ( AN &&...  an) -> operator_factory<reduce_tag, AN...>

For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator
Accumulatorthe type of the data accumulating function
ResultSelectorthe type of the result producing function
Parameters
seedthe initial value for the accumulator
aan accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
rsa result producing function that makes the final value from the last accumulator call result
Returns
An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.

Some basic reduce-type operators have already been implemented:

Sample Code
Geometric mean of source values:
auto values = rxcpp::observable<>::range(1, 7).
std::make_pair(0, 1.0),
[](std::pair<int, double> seed, int v){
seed.first += 1;
seed.second *= v;
return seed;
},
[](std::pair<int, double> res){
return std::pow(res.second, 1.0 / res.first);
});
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3.380015
OnCompleted
If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
auto values = rxcpp::observable<>::empty<int>().
1,
[](int,int){return 0;},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
If the accumulator raises an exception, it is returned by the resulting observable in on_error:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){
if (v == 2)
throw std::runtime_error("Exception from accumulator");
return seed;
},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from accumulator
The same for exceptions raised by the result selector:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){return seed + v;},
[](int res){
throw std::runtime_error("Exception from result selector");
return res;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from result selector
template<class... AN>
auto rxcpp::operators::all ( AN &&...  an) -> operator_factory<all_tag, AN...>

Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false. Emits true if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).all([](int n) { return n < 6; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
template<class... AN>
auto rxcpp::operators::amb ( AN &&...  an) -> operator_factory<amb_tag, AN...>

For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.

There are 2 variants of the operator:

  • The source observable emits nested observables, one of the nested observables is selected.
  • The source observable and the arguments v0...vn are used to provide the observables to select from.
Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.

If scheduler is omitted, identity_current_thread is used.

Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.amb(rxcpp::observe_on_new_thread());
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481294776064] Timer3 fired
[thread 47481294776064] OnNext: 3
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.amb();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto values = o1.amb(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481294776064] Timer3 fired
[thread 47481294776064] OnNext: 3
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::any ( AN &&...  an) -> operator_factory<any_tag, AN...>

Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
An observable that emits true if any item emitted by the source observable satisfies a specified condition, otherwise false.

Some basic any- operators have already been implemented:

Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).exists([](int n) { return n > 3; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).contains(3);
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
auto rxcpp::operators::as_blocking ( ) -> detail::blocking_factory
inline

Return a new observable that contains the blocking methods for this observable.

Returns
An observable that contains the blocking methods for this observable.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){
printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Emit value: 1
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Emit value: 2
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Emit value: 3
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
auto rxcpp::operators::as_dynamic ( ) -> detail::dynamic_factory
inline

Return a new observable that performs type-forgetting conversion of this observable.

Returns
The source observable converted to observable<T>.
Note
This operator could be useful to workaround lambda deduction bug on msvc 2013.
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::empty<int>();
auto values = o1.concat(o2, o3);
printf("type of o1: %s\n", typeid(o1).name());
printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name());
printf("type of o2: %s\n", typeid(o2).name());
printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name());
printf("type of o3: %s\n", typeid(o3).name());
printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name());
printf("type of values: %s\n", typeid(values).name());
printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name());
type of o1: N5rxcpp10observableIiNS_7sources6detail5rangeIiNS_19identity_one_workerEEEEE
type of o1.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o2: N5rxcpp10observableIiNS_7sources6detail7iterateINSt3__15arrayIiLm1EEENS_19identity_one_workerEEEEE
type of o2.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o3: N5rxcpp10observableIiNS_7sources6detail7iterateINSt3__15arrayIiLm0EEENS_19identity_one_workerEEEEE
type of o3.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of values: N5rxcpp10observableIiNS_9operators6detail6concatINS0_IiNS_18dynamic_observableIiEEEENS0_IS6_NS4_IS6_EEEENS_19identity_one_workerEEEEE
type of values.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
auto rxcpp::operators::average ( ) -> operator_factory<average_tag>
inline

For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.

Returns
An observable that emits a single item: the average of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 4).average();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2.500000
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().average();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: average() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class... AN>
auto rxcpp::operators::buffer ( AN &&...  an) -> operator_factory<buffer_count_tag, AN...>

Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable.

Parameters
countthe maximum size of each buffers before it should be emitted.
skiphow many items need to be skipped before starting a new buffers (optional).
Returns
Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable. If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 5).buffer(2);
values.
[](std::vector<int> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](int a){
printf(" %d", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3 4
OnNext: 5
OnCompleted
Sample Code
auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3);
values.
[](std::vector<int> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](int a){
printf(" %d", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 4 5
OnNext: 7
OnCompleted
template<class... AN>
auto rxcpp::operators::buffer_with_time ( AN &&...  an) -> operator_factory<buffer_with_time_tag, AN...>

Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer. If the skip parameter is set, Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.

Template Parameters
Durationthe type of the time interval
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each buffer collects items before it is emitted.
skipthe period of time after which a new buffer will be created (optional).
coordinationthe scheduler for the buffers (optional).
Returns
Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer. If the skip parameter is set, return an Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
map([](long v){
printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v);
return v;
}).
take(7).
values.
[](std::vector<long> v){
printf("[thread %s] OnNext:", get_pid().c_str());
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Interval OnNext: 1
[thread 47481267428736] Interval OnNext: 2
[thread 47481294776064] OnNext: 1 2
[thread 47481267428736] Interval OnNext: 3
[thread 47481267428736] Interval OnNext: 4
[thread 47481267428736] Interval OnNext: 5
[thread 47481294776064] OnNext: 4 5
[thread 47481267428736] Interval OnNext: 6
[thread 47481267428736] Interval OnNext: 7
[thread 47481294776064] OnNext: 7
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 4 5
OnNext: 7
OnCompleted
Overlapping buffers are allowed:
auto period = std::chrono::milliseconds(6);
auto skip = std::chrono::milliseconds(4);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2 3
OnNext: 3 4 5
OnNext: 5 6 7
OnNext: 7
OnCompleted
If no items are emitted, an empty buffer is returned:
auto period = std::chrono::milliseconds(2);
auto skip = std::chrono::milliseconds(4);
auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext:
OnNext:
OnNext: 1
OnCompleted
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3 4
OnNext: 5 6
OnNext: 7
OnCompleted
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
buffer_with_time(std::chrono::milliseconds(4));
values.
[](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3 4
OnNext: 5 6
OnNext: 7
OnCompleted
template<class... AN>
auto rxcpp::operators::buffer_with_time_or_count ( AN &&...  an) -> operator_factory<buffer_with_time_or_count_tag, AN...>

Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.

Template Parameters
Durationthe type of the time interval.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each buffer collects items before it is emitted and replaced with a new buffer.
countthe maximum size of each buffer before it is emitted and new buffer is created.
coordinationthe scheduler for the buffers (optional).
Returns
Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
Sample Code
auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
values.
[start](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3
OnNext:
OnNext: 1
OnCompleted
Sample Code
auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
values.
[start](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
});
printf("\n");
},
[](){printf("OnCompleted\n");});
OnNext: 1 2
OnNext: 3
OnNext:
OnNext: 1
OnCompleted
template<class... AN>
auto rxcpp::operators::combine_latest ( AN &&...  an) -> operator_factory<combine_latest_tag, AN...>

For each item from all of the observables select a value to emit from the new observable that is returned.

Template Parameters
ANtypes of scheduler (optional), aggregate function (optional), and source observables
Parameters
anscheduler (optional), aggregation function (optional), and source observables
Returns
Observable that emits items that are the result of combining the items emitted by the source observables.

If scheduler is omitted, identity_current_thread is used.

If aggregation function is omitted, the resulting observable returns tuples of emitted items.

Sample Code

Neither scheduler nor aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.combine_latest(o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1, 1, 1
OnNext: 2, 1, 1
OnNext: 2, 2, 1
OnNext: 3, 2, 1
OnNext: 3, 2, 2
OnCompleted

Only scheduler is present:

printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) {
printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto values = o1.combine_latest(thr, o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source1 OnNext: 1
[thread 47481267428736] Source2 OnNext: 1
[thread 47481267428736] Source3 OnNext: 1
[thread 47481298978560] OnNext: 1, 1, 1
[thread 47481267428736] Source1 OnNext: 2
[thread 47481298978560] OnNext: 2, 1, 1
[thread 47481267428736] Source2 OnNext: 2
[thread 47481298978560] OnNext: 2, 2, 1
[thread 47481267428736] Source1 OnNext: 3
[thread 47481298978560] OnNext: 3, 2, 1
[thread 47481267428736] Source3 OnNext: 2
[thread 47481298978560] OnNext: 3, 2, 2
[thread 47481298978560] OnCompleted
[thread 47481267428736] Finish task

Only aggregation function is present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.combine_latest(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 211
OnNext: 221
OnNext: 321
OnNext: 322
OnCompleted

Both scheduler and aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.combine_latest(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 211
OnNext: 221
OnNext: 321
OnNext: 322
OnCompleted
template<class... AN>
auto rxcpp::operators::concat ( AN &&...  an) -> operator_factory<concat_tag, AN...>

For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.

There are 2 variants of the operator:

  • The source observable emits nested observables, nested observables are concatenated.
  • The source observable and the arguments v0...vn are used to provide the observables to concatenate.
Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.concat();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.concat(rxcpp::observe_on_new_thread());
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto values = o1.concat(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::range(1, 3);
auto o3 = rxcpp::observable<>::from(5, 6);
auto values = o1.concat(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnNext: 6
OnCompleted
template<class... AN>
auto rxcpp::operators::concat_map ( AN &&...  an) -> operator_factory<concat_map_tag, AN...>

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts. (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 1 - 3
OnNext: 2 - 1
OnNext: 2 - 2
OnNext: 2 - 3
OnNext: 3 - 1
OnNext: 3 - 2
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %ld)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::concat_transform ( AN &&...  an) -> operator_factory<concat_map_tag, AN...>

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts. (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 1 - 3
OnNext: 2 - 1
OnNext: 2 - 2
OnNext: 2 - 3
OnNext: 3 - 1
OnNext: 3 - 2
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %ld)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::connect_forever ( AN &&...  an) -> operator_factory<connect_forever_tag, AN...>

takes a connectable_observable source and calls connect during the construction of the expression. This means that the source starts running without any subscribers and continues running after all subscriptions have been unsubscribed.

Returns
An observable that emitting the items from its source.
template<class... AN>
auto rxcpp::operators::contains ( AN &&...  an) -> operator_factory<contains_tag, AN...>

Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Tthe type of the item to search for.
Parameters
valuethe item to search for.
Returns
An observable that emits true if the source Observable emitted a specified item, otherwise false.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).contains(3);
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
auto rxcpp::operators::count ( ) -> operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>
inline

For each item from this observable reduce it by incrementing a count.

Returns
An observable that emits a single item: the number of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class... AN>
auto rxcpp::operators::debounce ( AN &&...  an) -> operator_factory<debounce_tag, AN...>

Return an observable that emits an item if a particular timespan has passed without emitting another item from the source observable.

Template Parameters
Durationthe type of the time interval
Coordinationthe type of the scheduler
Parameters
periodthe period of time to suppress any emitted items
coordinationthe scheduler to manage timeout for each event
Returns
Observable that emits an item if a particular timespan has passed without emitting another item from the source observable.
Sample Code
using namespace std::chrono;
auto scheduler = rxcpp::identity_current_thread();
auto start = scheduler.now();
auto period = milliseconds(10);
auto values = rxcpp::observable<>::interval(start, period, scheduler).
take(4).
debounce(period);
values.
[](long v) { printf("OnNext: %ld\n", v); },
[]() { printf("OnCompleted\n"); });
OnNext: 1
OnNext: 2
OnNext: 4
OnCompleted
template<class... AN>
auto rxcpp::operators::default_if_empty ( AN &&...  an) -> operator_factory<default_if_empty_tag, AN...>

If the source Observable terminates without emitting any items, emits a default item and completes.

Template Parameters
Valuethe type of the value to emit.
Parameters
vthe default value to emit.
Returns
Observable that emits the specified default item if the source observable is empty.
Sample Code
auto values = rxcpp::observable<>::empty<int>()
values.subscribe(
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); } );
OnNext: 42
OnCompleted
template<class... AN>
auto rxcpp::operators::delay ( AN &&...  an) -> operator_factory<delay_tag, AN...>

Return an observable that emits each item emitted by the source observable after the specified delay.

Template Parameters
Durationthe type of time interval
Coordinationthe type of the scheduler
Parameters
periodthe period of time each item is delayed
coordinationthe scheduler for the delays
Returns
Observable that emits each item emitted by the source observable after the specified delay.
Sample Code
using namespace std::chrono;
auto scheduler = rxcpp::identity_current_thread();
auto start = scheduler.now();
auto period = milliseconds(10);
const auto next = [=](const char* s) {
return [=](long v){
auto t = duration_cast<milliseconds>(scheduler.now() - start);
long long int ms = t.count();
printf("[%s @ %lld] OnNext: %ld\n", s, ms, v);
};
};
auto values = rxcpp::observable<>::interval(start, period, scheduler).
take(4).
tap(next("interval")).
values.
next(" delayed"),
[](){printf("OnCompleted\n");});
[interval @ 0] OnNext: 1
[interval @ 10] OnNext: 2
[ delayed @ 10] OnNext: 1
[interval @ 20] OnNext: 3
[ delayed @ 20] OnNext: 2
[interval @ 30] OnNext: 4
[ delayed @ 30] OnNext: 3
[ delayed @ 40] OnNext: 4
OnCompleted
template<class... AN>
auto rxcpp::operators::distinct ( AN &&...  an) -> operator_factory<distinct_tag, AN...>

For each item from this observable, filter out repeated values and emit only items that have not already been emitted.

Returns
Observable that emits those items from the source observable that are distinct.
Note
istinct keeps an unordered_set<T> of past values. Due to an issue in multiple implementations of std::hash<T>, rxcpp maintains a whitelist of hashable types. new types can be added by specializing rxcpp::filtered_hash<T>
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted
template<class... AN>
auto rxcpp::operators::distinct_until_changed ( AN &&...  an) -> operator_factory<distinct_until_changed_tag, AN...>

For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.

Template Parameters
BinaryPredicate(optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
Parameters
pred(optional) the function that implements comparison of two values.
Returns
Observable that emits those items from the source observable that are distinct from their immediate predecessors.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct_until_changed();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted
template<class... AN>
auto rxcpp::operators::element_at ( AN &&...  an) -> operator_factory<element_at_tag, AN...>

Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission.

Parameters
indexthe index of the element to return.
Returns
An observable that emit an item located at a specified index location.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).element_at(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4
OnCompleted
template<class... AN>
auto rxcpp::operators::exists ( AN &&...  an) -> operator_factory<exists_tag, AN...>

Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
An observable that emits true if any item emitted by the source observable satisfies a specified condition, otherwise false.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).exists([](int n) { return n > 3; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted
template<class... AN>
auto rxcpp::operators::filter ( AN &&...  an) -> operator_factory<filter_tag, AN...>

For each item from this observable use Predicate to select which items to emit from the new observable that is returned.

Template Parameters
Predicatethe type of the filter function
Parameters
pthe filter function
Returns
Observable that emits only those items emitted by the source observable that the filter evaluates as true.
Sample Code
auto values = rxcpp::observable<>::range(1, 6).
filter([](int v){
return v % 2;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 3
OnNext: 5
OnCompleted
template<class... AN>
auto rxcpp::operators::finally ( AN &&...  an) -> operator_factory<
final

Add a new action at the end of the new observable that is returned.

Template Parameters
LastCallthe type of the action function
Parameters
lcthe action function
Returns
Observable that emits the same items as the source observable, then invokes the given action.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
finally([](){
printf("The final action\n");
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
The final action
If the source observable generates an error, the final action is still being called:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
finally([](){
printf("The final action\n");
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnError: Error from source
The final action
auto rxcpp::operators::first ( ) -> operator_factory<first_tag>
inline

For each item from this observable reduce it by sending only the first item.

Returns
An observable that emits only the very first item emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).first();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::empty<int>().first();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: first() requires a stream with at least one value
template<class... AN>
auto rxcpp::operators::flat_map ( AN &&...  an) -> operator_factory<flat_map_tag, AN...>

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 2 - 1
OnNext: 3 - 1
OnNext: 1 - 2
OnNext: 2 - 2
OnNext: 3 - 2
OnNext: 1 - 3
OnNext: 2 - 3
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, int v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::group_by ( AN &&...  an) -> operator_factory<group_by_tag, AN...>

Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.

Template Parameters
KeySelectorthe type of the key extracting function
MarbleSelectorthe type of the element extracting function
BinaryPredicatethe type of the key comparing function
Parameters
ksa function that extracts the key for each item (optional)
msa function that extracts the return element for each item (optional)
pa function that implements comparison of two keys (optional)
Returns
Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.
Sample Code
bool less(int v1, int v2){
return v1 < v2;
}
auto data = rxcpp::observable<>::range(0, 8).
map([](int v){
std::stringstream s;
s << "Value " << v;
return std::make_pair(v % 3, s.str());
});
auto values = data.group_by(
[](std::pair<int, std::string> v){return v.first;},
[](std::pair<int, std::string> v){return v.second;},
less);
values.
auto key = g.get_key();
printf("OnNext: key = %d\n", key);
[key](const std::string& v){printf("[key %d] OnNext: %s\n", key, v.c_str());},
[key](){printf("[key %d] OnCompleted\n", key);});
},
[](){printf("OnCompleted\n");});
OnNext: key = 0
[key 0] OnNext: Value 0
OnNext: key = 1
[key 1] OnNext: Value 1
OnNext: key = 2
[key 2] OnNext: Value 2
[key 0] OnNext: Value 3
[key 1] OnNext: Value 4
[key 2] OnNext: Value 5
[key 0] OnNext: Value 6
[key 1] OnNext: Value 7
[key 2] OnNext: Value 8
[key 0] OnCompleted
[key 1] OnCompleted
[key 2] OnCompleted
OnCompleted
Sample Code
auto values = rxcpp::observable<>::range(0, 8).
[](int v){return v % 3;},
[](int v){return 10 * v;});
values.
auto key = g.get_key();
printf("OnNext: key = %d\n", key);
[key](int v){printf("[key %d] OnNext: %d\n", key, v);},
[key](){printf("[key %d] OnCompleted\n", key);});
},
[](){printf("OnCompleted\n");});
OnNext: key = 0
[key 0] OnNext: 0
OnNext: key = 1
[key 1] OnNext: 10
OnNext: key = 2
[key 2] OnNext: 20
[key 0] OnNext: 30
[key 1] OnNext: 40
[key 2] OnNext: 50
[key 0] OnNext: 60
[key 1] OnNext: 70
[key 2] OnNext: 80
[key 0] OnCompleted
[key 1] OnCompleted
[key 2] OnCompleted
OnCompleted
template<class... AN>
auto rxcpp::operators::ignore_elements ( AN &&...  an) -> operator_factory<ignore_elements_tag, AN...>

Do not emit any items from the source Observable, but allow termination notification (either onError or onCompleted) to pass through unchanged.

Returns
Observable that emits termination notification from the source observable.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).ignore_elements();
values.
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); });
OnCompleted
template<class... AN>
auto rxcpp::operators::is_empty ( AN &&...  an) -> operator_factory<is_empty_tag, AN...>

Returns an Observable that emits true if the source Observable is empty, otherwise false.

Returns
An observable that emits a boolean value.
Sample Code
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).is_empty();
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
auto rxcpp::operators::last ( ) -> operator_factory<last_tag>
inline

For each item from this observable reduce it by sending only the last item.

Returns
An observable that emits only the very last item emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).last();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::empty<int>().last();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: last() requires a stream with at least one value
template<class ResultType , class Operator >
auto rxcpp::operators::lift ( Operator &&  op) -> detail::lift_factory<ResultType, Operator>
template<class... AN>
auto rxcpp::operators::map ( AN &&...  an) -> operator_factory<map_tag, AN...>

For each item from this observable use Selector to produce an item to emit from the new observable that is returned.

Template Parameters
Selectorthe type of the transforming function
Parameters
sthe selector function
Returns
Observable that emits the items from the source observable, transformed by the specified function.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
return 2 * v;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2
OnNext: 4
OnNext: 6
OnCompleted
auto rxcpp::operators::max ( ) -> operator_factory<max_tag>
inline

For each item from this observable reduce it by taking the max value of the previous items.

Returns
An observable that emits a single item: the max of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 4).max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4.000000
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: max() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class... AN>
auto rxcpp::operators::merge ( AN &&...  an) -> operator_factory<merge_tag, AN...>

For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.

There are 2 variants of the operator:

  • The source observable emits nested observables, nested observables are merged.
  • The source observable and the arguments v0...vn are used to provide the observables to merge.
Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits items that are the result of flattening the observables emitted by the source observable.

If scheduler is omitted, identity_current_thread is used.

Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.merge(rxcpp::observe_on_new_thread());
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Timer1 fired
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Timer2 fired
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Timer3 fired
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.merge();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnNext: 2
OnNext: 1
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto values = o1.merge(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnNext: 2
OnNext: 1
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Timer1 fired
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Timer2 fired
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Timer3 fired
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::merge_transform ( AN &&...  an) -> operator_factory<flat_map_tag, AN...>

For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.

Template Parameters
CollectionSelectorthe type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
ResultSelectorthe type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
Coordinationthe type of the scheduler (optional).
Parameters
sa function that returns an observable for each item emitted by the source observable.
rsa function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
cnthe scheduler to synchronize sources from different contexts (optional).
Returns
Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.

Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.

Sample Code
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, long v_sub){
return std::make_tuple(v_main, v_sub);
});
values.
[](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1 - 1
OnNext: 2 - 1
OnNext: 3 - 1
OnNext: 1 - 2
OnNext: 2 - 2
OnNext: 3 - 2
OnNext: 1 - 3
OnNext: 2 - 3
OnNext: 3 - 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
[](int v){
printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
return
rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
take(3);
},
[](int v_main, int v_sub){
printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
return std::make_tuple(v_main, v_sub);
},
values.
[](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %ld\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Call CollectionSelector(v = 1)
[thread 47481303181056] Call CollectionSelector(v = 2)
[thread 47481303181056] Call CollectionSelector(v = 3)
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 1)
[thread 47481303181056] OnNext: 1 - 1
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 1)
[thread 47481303181056] OnNext: 2 - 1
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 1)
[thread 47481303181056] OnNext: 3 - 1
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 2)
[thread 47481303181056] OnNext: 1 - 2
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 2)
[thread 47481303181056] OnNext: 2 - 2
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 2)
[thread 47481303181056] OnNext: 3 - 2
[thread 47481303181056] Call ResultSelector(v_main = 1, v_sub = 3)
[thread 47481303181056] OnNext: 1 - 3
[thread 47481303181056] Call ResultSelector(v_main = 2, v_sub = 3)
[thread 47481303181056] OnNext: 2 - 3
[thread 47481303181056] Call ResultSelector(v_main = 3, v_sub = 3)
[thread 47481303181056] OnNext: 3 - 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
auto rxcpp::operators::min ( ) -> operator_factory<min_tag>
inline

For each item from this observable reduce it by taking the min value of the previous items.

Returns
An observable that emits a single item: the min of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 4).min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1.000000
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: min() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class... AN>
auto rxcpp::operators::multicast ( AN &&...  an) -> operator_factory<multicast_tag, AN...>

allows connections to the source to be independent of subscriptions.

Template Parameters
Subjectthe subject to multicast the source Observable.
Parameters
subthe subject.
template<class... AN>
auto rxcpp::operators::observe_on ( AN &&...  an) -> operator_factory<observe_on_tag, AN...>

All values are queued and delivered using the scheduler from the supplied coordination.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cnthe scheduler to notify observers on.
Returns
The source observable modified so that its observers are notified on the specified scheduler.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Emit value 1
[thread 47481267428736] Emit value 2
[thread 47481267428736] Emit value 3
[thread 47481303181056] OnNext: 1
[thread 47481303181056] OnNext: 2
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
[thread 47481267428736] Start task
[thread 47481313687296] Emit value 1
[thread 47481313687296] OnNext: 1
[thread 47481313687296] Emit value 2
[thread 47481313687296] OnNext: 2
[thread 47481313687296] Emit value 3
[thread 47481313687296] OnNext: 3
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::on_error_resume_next ( AN &&...  an) -> operator_factory<on_error_resume_next_tag, AN...>

If an error occurs, take the result from the Selector and subscribe to that instead.

Template Parameters
Selectorthe actual type of a function of the form observable<T>(std::exception_ptr)
Parameters
sthe function of the form observable<T>(std::exception_ptr)
Returns
Observable that emits the items from the source observable and switches to a new observable on error.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
on_error_resume_next([](std::exception_ptr ep){
printf("Resuming after: %s\n", rxu::what(ep).c_str());
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
Resuming after: Error from source
OnNext: -1
OnCompleted
template<class... AN>
auto rxcpp::operators::pairwise ( AN &&...  an) -> operator_factory<pairwise_tag, AN...>

Take values pairwise from this observable.

Returns
Observable that emits tuples of two the most recent items emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 5).pairwise();
values.
[](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1, 2
OnNext: 2, 3
OnNext: 3, 4
OnNext: 4, 5
OnCompleted
If the source observable emits less than two items, no pairs are emitted by the source observable:
auto values = rxcpp::observable<>::just(1).pairwise();
values.
[](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
[](){printf("OnCompleted\n");});
OnCompleted
template<class... AN>
auto rxcpp::operators::publish ( AN &&...  an) -> operator_factory<publish_tag, AN...>

Turn a cold observable hot and allow connections to the source to be independent of subscriptions. Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.

Template Parameters
Tthe type of the emitted item (optional).
Parameters
firstan initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional).
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[3] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[3] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[3] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
publish(0L);
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 0
[2] OnNext: 0
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[3] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[3] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[3] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[3] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted
template<class... AN>
auto rxcpp::operators::publish_synchronized ( AN &&...  an) -> operator_factory<publish_synchronized_tag, AN...>

Turn a cold observable hot and allow connections to the source to be independent of subscriptions.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cna scheduler all values are queued and delivered on.
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted
template<class... AN>
auto rxcpp::operators::reduce ( AN &&...  an) -> operator_factory<reduce_tag, AN...>

For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator
Accumulatorthe type of the data accumulating function
ResultSelectorthe type of the result producing function
Parameters
seedthe initial value for the accumulator
aan accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
rsa result producing function that makes the final value from the last accumulator call result
Returns
An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.

Some basic reduce-type operators have already been implemented:

Sample Code
Geometric mean of source values:
auto values = rxcpp::observable<>::range(1, 7).
std::make_pair(0, 1.0),
[](std::pair<int, double> seed, int v){
seed.first += 1;
seed.second *= v;
return seed;
},
[](std::pair<int, double> res){
return std::pow(res.second, 1.0 / res.first);
});
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3.380015
OnCompleted
If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
auto values = rxcpp::observable<>::empty<int>().
1,
[](int,int){return 0;},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
If the accumulator raises an exception, it is returned by the resulting observable in on_error:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){
if (v == 2)
throw std::runtime_error("Exception from accumulator");
return seed;
},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from accumulator
The same for exceptions raised by the result selector:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){return seed + v;},
[](int res){
throw std::runtime_error("Exception from result selector");
return res;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from result selector
template<class... AN>
auto rxcpp::operators::ref_count ( AN &&...  an) -> operator_factory<ref_count_tag, AN...>

takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source. The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection.

Returns
An observable that emitting the items from its source.
template<class... AN>
auto rxcpp::operators::repeat ( AN &&...  an) -> operator_factory<repeat_tag, AN...>

Repeat this observable for the given number of times or infinitely.

Template Parameters
Countthe type of the counter (optional).
Parameters
tThe number of times the source observable items are repeated (optional). If not specified, infinitely repeats the source observable. Specifying 0 returns an empty sequence immediately
Returns
An observable that repeats the sequence of items emitted by the source observable for t times.
Sample Code
auto values = rxcpp::observable<>::from(1, 2).repeat(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnCompleted
If the source observable calls on_error, repeat stops:
auto values = rxcpp::observable<>::from(1, 2).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
repeat();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnError: Error from source
template<class... AN>
auto rxcpp::operators::replay ( AN &&...  an) -> operator_factory<replay_tag, AN...>

1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.

2) replay(Count, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.

3) replay(Duration, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.

4) replay(Count, Duration, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.

Template Parameters
Durationthe type of the time interval (optional).
Countthe type of the maximum number of the most recent items sent to new observers (optional).
Coordinationthe type of the scheduler (optional).
Parameters
countthe maximum number of the most recent items sent to new observers (optional).
dthe duration of the window in which the replayed items must be emitted
cna scheduler all values are queued and delivered on (optional).
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay();
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[2] OnNext: 1
[2] OnNext: 2
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056][1] OnNext: 1
[thread 47481303181056][1] OnNext: 2
[thread 47481303181056][1] OnNext: 3
[thread 47481303181056][2] OnNext: 1
[thread 47481303181056][2] OnNext: 2
[thread 47481303181056][2] OnNext: 3
[thread 47481303181056][1] OnNext: 4
[thread 47481303181056][2] OnNext: 4
[thread 47481303181056][1] OnNext: 5
[thread 47481303181056][2] OnNext: 5
[thread 47481303181056][1] OnCompleted
[thread 47481303181056][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(2);
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[2] OnNext: 2
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(2, coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481305282304][1] OnNext: 1
[thread 47481305282304][1] OnNext: 2
[thread 47481305282304][1] OnNext: 3
[thread 47481305282304][2] OnNext: 2
[thread 47481305282304][2] OnNext: 3
[thread 47481305282304][1] OnNext: 4
[thread 47481305282304][2] OnNext: 4
[thread 47481305282304][1] OnNext: 5
[thread 47481305282304][2] OnNext: 5
[thread 47481305282304][1] OnCompleted
[thread 47481305282304][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(std::chrono::milliseconds(125));
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[1] OnNext: 4
[2] OnNext: 2
[2] OnNext: 3
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(std::chrono::milliseconds(125), coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481307383552][1] OnNext: 1
[thread 47481307383552][1] OnNext: 2
[thread 47481307383552][1] OnNext: 3
[thread 47481307383552][1] OnNext: 4
[thread 47481307383552][2] OnNext: 2
[thread 47481307383552][2] OnNext: 3
[thread 47481307383552][2] OnNext: 4
[thread 47481307383552][1] OnNext: 5
[thread 47481307383552][2] OnNext: 5
[thread 47481307383552][1] OnCompleted
[thread 47481307383552][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(2, std::chrono::milliseconds(125));
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[1] OnNext: 4
[2] OnNext: 3
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(2, std::chrono::milliseconds(125), coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481309484800][1] OnNext: 1
[thread 47481309484800][1] OnNext: 2
[thread 47481309484800][1] OnNext: 3
[thread 47481309484800][1] OnNext: 4
[thread 47481309484800][2] OnNext: 3
[thread 47481309484800][2] OnNext: 4
[thread 47481309484800][1] OnNext: 5
[thread 47481309484800][2] OnNext: 5
[thread 47481309484800][1] OnCompleted
[thread 47481309484800][2] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::retry ( AN &&...  an) -> operator_factory<retry_tag, AN...>

Retry this observable for the given number of times.

Template Parameters
Countthe type of the counter (optional)
Parameters
tthe total number of tries (optional), i.e. retry(2) means one normal try, before an error occurs, and one retry. If not specified, infinitely retries the source observable. Specifying 0 returns immediately without subscribing
Returns
An observable that mirrors the source observable, resubscribing to it if it calls on_error up to a specified number of retries.
Sample Code
auto source = rxcpp::observable<>::from(1, 2).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
auto values = source.retry(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnNext: 1
OnNext: 2
OnError: Error from source
template<class... AN>
auto rxcpp::operators::sample_with_time ( AN &&...  an) -> operator_factory<sample_with_time_tag, AN...>

Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals.

Template Parameters
Durationthe type of time interval.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time to sample the source observable.
coordinationthe scheduler for the items (optional).
Returns
Observable that emits the most recently emitted item since the previous sampling.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).
take(7).
sample_with_time(std::chrono::milliseconds(4));
values.
[](long v) {
printf("OnNext: %ld\n", v);
},
[]() { printf("OnCompleted\n"); });
OnNext: 1
OnNext: 3
OnNext: 5
OnCompleted
template<class... AN>
auto rxcpp::operators::scan ( AN &&...  an) -> operator_factory<scan_tag, AN...>

For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator.
Accumulatorthe type of the data accumulating function.
Parameters
seedthe initial value for the accumulator.
aan accumulator function to be invoked on each item emitted by the source observable, whose result will be emitted and used in the next accumulator call.
Returns
An observable that emits the results of each call to the accumulator function.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).
0,
[](int seed, int v){
return seed + v;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 3
OnNext: 6
OnNext: 10
OnNext: 15
OnNext: 21
OnNext: 28
OnCompleted
template<class... AN>
auto rxcpp::operators::sequence_equal ( AN &&...  an) -> operator_factory<sequence_equal_tag, AN...>

Determine whether two Observables emit the same sequence of items.

Template Parameters
OtherSourcethe type of the other observable.
BinaryPredicatethe type of the value comparing function (optional). The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
Coordinationthe type of the scheduler (optional).
Parameters
tthe other Observable that emits items to compare.
predthe function that implements comparison of two values (optional).
cnthe scheduler (optional).
Returns
Observable that emits true only if both sequences terminate normally after emitting the same sequence of items in the same order; otherwise it will emit false.
Sample Code
auto source = rxcpp::observable<>::range(1, 3);
auto values = source.sequence_equal(rxcpp::observable<>::range(1, 3));
values.
[](bool v){ printf("OnNext: %s\n", v ? "true" : "false"); },
[](){ printf("OnCompleted\n");} );
OnNext: true
OnCompleted
template<class... AN>
auto rxcpp::operators::skip ( AN &&...  an) -> operator_factory<skip_tag, AN...>

Make new observable with skipped first count items from this observable.

Template Parameters
Countthe type of the items counter
Parameters
tthe number of items to skip
Returns
An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).skip(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
template<class... AN>
auto rxcpp::operators::skip_last ( AN &&...  an) -> operator_factory<skip_last_tag, AN...>

Make new observable with skipped last count items from this observable.

Template Parameters
Countthe type of the items counter.
Parameters
tthe number of last items to skip.
Returns
An observable that is identical to the source observable except that it does not emit the last t items that the source observable emits.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).skip_last(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
template<class... AN>
auto rxcpp::operators::skip_until ( AN &&...  an) -> operator_factory<skip_until_tag, AN...>

Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time. skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)

Template Parameters
TriggerSourcethe type of the trigger observable.
Coordinationthe type of the scheduler (optional).
Parameters
tan observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable.
cnthe scheduler to use for scheduling the items (optional).
Returns
An observable that skips items from the source observable until the second observable emits an item or the time runs out, then emits the remaining items.
Sample Code
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
auto values = source.skip_until(trigger);
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto values = source.skip_until(trigger, rxcpp::observe_on_new_thread());
values.
[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source emits, value = 1
[thread 47481267428736] Source emits, value = 2
[thread 47481267428736] Source emits, value = 3
[thread 47481267428736] Trigger emits, value = 1
[thread 47481267428736] Source emits, value = 4
[thread 47481311586048] OnNext: 4
[thread 47481267428736] Source emits, value = 5
[thread 47481311586048] OnNext: 5
[thread 47481267428736] Source emits, value = 6
[thread 47481311586048] OnNext: 6
[thread 47481267428736] Source emits, value = 7
[thread 47481311586048] OnNext: 7
[thread 47481311586048] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::start_with ( AN &&...  an) -> operator_factory<start_with_tag, AN...>

Start with the supplied values, then concatenate this observable.

Template Parameters
Value0...
ValueNthe type of sending values
Parameters
v0...
vnvalues to send
Returns
Observable that emits the specified items and then emits the items emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(10, 12).
start_with(1, 2, 3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 10
OnNext: 11
OnNext: 12
OnCompleted
Another form of this operator, rxcpp::observable<void, void>::start_with, gets the source observable as a parameter:
auto observable = rxcpp::observable<>::range(10, 12);
auto values = rxcpp::observable<>::start_with(observable, 1, 2, 3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 10
OnNext: 11
OnNext: 12
OnCompleted
template<class T , class... ArgN>
auto rxcpp::operators::subscribe ( ArgN &&...  an) -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<ArgN>(an)...))>

Subscribe will cause the source observable to emit values to the provided subscriber.

Template Parameters
ArgNtypes of the subscriber parameters
Parameters
anthe parameters for making a subscriber
Returns
A subscription with which the observer can stop receiving items before the observable has finished sending them.

The arguments of subscribe are forwarded to rxcpp::make_subscriber function. Some possible alternatives are:

  • Pass an already composed rxcpp::subscriber:
    auto subscriber = rxcpp::make_subscriber<int>(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});
    auto values = rxcpp::observable<>::range(1, 3);
    values.subscribe(subscriber);
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnCompleted
  • Pass an rxcpp::observer. This allows subscribing the same subscriber to several observables:
    auto subscriber = rxcpp::make_subscriber<int>(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});
    auto values1 = rxcpp::observable<>::range(1, 3);
    auto values2 = rxcpp::observable<>::range(4, 6);
    values1.subscribe(subscriber.get_observer());
    values2.subscribe(subscriber.get_observer());
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnCompleted
    OnNext: 4
    OnNext: 5
    OnNext: 6
    OnCompleted
  • Pass an on_next handler:
    auto values = rxcpp::observable<>::range(1, 3);
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);});
    OnNext: 1
    OnNext: 2
    OnNext: 3
  • Pass on_next and on_error handlers:
    auto values = rxcpp::observable<>::range(1, 3).
    concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](std::exception_ptr ep){
    try {std::rethrow_exception(ep);}
    catch (const std::exception& ex) {
    printf("OnError: %s\n", ex.what());
    }
    });
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnError: Error from source
  • Pass on_next and on_completed handlers:
    auto values = rxcpp::observable<>::range(1, 3);
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnCompleted
  • Pass on_next, on_error, and on_completed handlers:
    auto values = rxcpp::observable<>::range(1, 3).
    concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
    values.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](std::exception_ptr ep){
    try {std::rethrow_exception(ep);}
    catch (const std::exception& ex) {
    printf("OnError: %s\n", ex.what());
    }
    },
    [](){printf("OnCompleted\n");});
    OnNext: 1
    OnNext: 2
    OnNext: 3
    OnError: Error from source

All the alternatives above also support passing rxcpp::composite_subscription instance. For example:

auto subscription = rxcpp::composite_subscription();
auto values = rxcpp::observable<>::range(1, 5);
values.subscribe(
subscription,
[&subscription](int v){
printf("OnNext: %d\n", v);
if (v == 3)
subscription.unsubscribe();
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3

If neither subscription nor subscriber are provided, then a new subscription is created and returned as a result:

auto values = rxcpp::observable<>::range(1, 3).
finally([](){printf("The final action\n");});
auto subscription = values.subscribe(
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
subscription.unsubscribe();
OnNext: 1
OnNext: 2
OnNext: 3
The final action

For more details, see rxcpp::make_subscriber function description.

template<class... AN>
auto rxcpp::operators::subscribe_on ( AN &&...  an) -> operator_factory<subscribe_on_tag, AN...>

Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cnthe scheduler to perform subscription actions on.
Returns
The source observable modified so that its subscriptions happen on the specified scheduler.
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481313687296] Emit value 1
[thread 47481313687296] OnNext: 1
[thread 47481313687296] Emit value 2
[thread 47481313687296] OnNext: 2
[thread 47481313687296] Emit value 3
[thread 47481313687296] OnNext: 3
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
[thread 47481267428736] Start task
[thread 47481267428736] Emit value 1
[thread 47481267428736] Emit value 2
[thread 47481267428736] Emit value 3
[thread 47481303181056] OnNext: 1
[thread 47481303181056] OnNext: 2
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
auto rxcpp::operators::sum ( ) -> operator_factory<sum_tag>
inline

For each item from this observable reduce it by adding to the previous items.

Returns
An observable that emits a single item: the sum of elements emitted by the source observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 6
OnCompleted
When the source observable completes without emitting any items:
auto values = rxcpp::observable<>::empty<int>().sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: sum() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source
template<class... AN>
auto rxcpp::operators::switch_if_empty ( AN &&...  an) -> operator_factory<switch_if_empty_tag, AN...>

If the source Observable terminates without emitting any items, emits items from a backup Observable.

Template Parameters
BackupSourcethe type of the backup observable.
Parameters
ta backup observable that is used if the source observable is empty.
Returns
Observable that emits items from a backup observable if the source observable is empty.
Sample Code
auto values = rxcpp::observable<>::empty<int>()
values.subscribe(
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); } );
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted
template<class... AN>
auto rxcpp::operators::switch_on_error ( AN &&...  an) -> operator_factory<on_error_resume_next_tag, AN...>

If an error occurs, take the result from the Selector and subscribe to that instead.

Template Parameters
Selectorthe actual type of a function of the form observable<T>(std::exception_ptr)
Parameters
sthe function of the form observable<T>(std::exception_ptr)
Returns
Observable that emits the items from the source observable and switches to a new observable on error.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
on_error_resume_next([](std::exception_ptr ep){
printf("Resuming after: %s\n", rxu::what(ep).c_str());
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
Resuming after: Error from source
OnNext: -1
OnCompleted
template<class... AN>
auto rxcpp::operators::switch_on_next ( AN &&...  an) -> operator_factory<switch_on_next_tag, AN...>

Return observable that emits the items emitted by the observable most recently emitted by the source observable.

Template Parameters
Coordinationthe type of the scheduler (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
Returns
Observable that emits the items emitted by the observable most recently emitted by the source observable.
Sample Code
auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)).
take(3).
map([](long){
return rxcpp::observable<>::interval(std::chrono::milliseconds(10)).as_dynamic();
});
auto values = base.switch_on_next().take(10);
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
template<class... AN>
auto rxcpp::operators::take ( AN &&...  an) -> operator_factory<take_tag, AN...>

For the first count items from this observable emit them from the new observable that is returned.

Template Parameters
Countthe type of the items counter.
Parameters
tthe number of items to take.
Returns
An observable that emits only the first t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).take(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
template<class... AN>
auto rxcpp::operators::take_last ( AN &&...  an) -> operator_factory<take_last_tag, AN...>

Emit only the final t items emitted by the source Observable.

Template Parameters
Countthe type of the items counter.
Parameters
tthe number of last items to take.
Returns
An observable that emits only the last t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items.
Sample Code
auto values = rxcpp::observable<>::range(1, 7).take_last(3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
template<class... AN>
auto rxcpp::operators::take_until ( AN &&...  an) -> operator_factory<take_until_tag, AN...>

For each item from this observable until on_next occurs on the trigger observable or until the specified time, emit them from the new observable that is returned. take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)

Template Parameters
TriggerSourcethe type of the trigger observable.
TimePointthe type of the time interval.
Coordinationthe type of the scheduler (optional).
Parameters
tan observable whose first emitted item will stop emitting items from the source observable.
whena time point when the returned observable will stop emitting items.
cnthe scheduler to use for scheduling the items (optional).
Returns
An observable that emits the items emitted by the source observable until trigger observable emitted or the time runs out.
Sample Code
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
auto values = source.take_until(trigger);
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto values = source.take_until(trigger, rxcpp::observe_on_new_thread());
values.
[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source emits, value = 1
[thread 47481313687296] OnNext: 1
[thread 47481267428736] Source emits, value = 2
[thread 47481313687296] OnNext: 2
[thread 47481267428736] Source emits, value = 3
[thread 47481313687296] OnNext: 3
[thread 47481267428736] Trigger emits, value = 1
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25));
values.
[](long v){printf("OnNext: %ld\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
return v;
});
auto scheduler = rxcpp::observe_on_new_thread();
auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler);
values.
[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source emits, value = 1
[thread 47481313687296] OnNext: 1
[thread 47481267428736] Source emits, value = 2
[thread 47481313687296] OnNext: 2
[thread 47481267428736] Source emits, value = 3
[thread 47481313687296] OnNext: 3
[thread 47481313687296] OnCompleted
[thread 47481267428736] Finish task
template<class... AN>
auto rxcpp::operators::take_while ( AN &&...  an) -> operator_factory<take_while_tag, AN...>

For the first items fulfilling the predicate from this observable emit them from the new observable that is returned.

Template Parameters
Predicatethe type of the predicate
Parameters
tthe predicate
Returns
An observable that emits only the first items emitted by the source Observable fulfilling the predicate, or all of the items from the source observable if the predicate never returns false
Sample Code
auto values = rxcpp::observable<>::range(1, 8).
take_while([](int v){
return v <= 4;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
template<class... AN>
auto rxcpp::operators::tap ( AN &&...  an) -> operator_factory<tap_tag, AN...>

inspect calls to on_next, on_error and on_completed.

Template Parameters
MakeObserverArgN...these args are passed to make_observer.
Parameters
anthese args are passed to make_observer.
Returns
Observable that emits the same items as the source observable to both the subscriber and the observer.
Note
If an on_error method is not supplied the observer will ignore errors rather than call std::terminate()
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
tap(
[](int v){printf("Tap - OnNext: %d\n", v);},
[](){printf("Tap - OnCompleted\n");});
values.
[](int v){printf("Subscribe - OnNext: %d\n", v);},
[](){printf("Subscribe - OnCompleted\n");});
Tap - OnNext: 1
Subscribe - OnNext: 1
Tap - OnNext: 2
Subscribe - OnNext: 2
Tap - OnNext: 3
Subscribe - OnNext: 3
Tap - OnCompleted
Subscribe - OnCompleted
If the source observable generates an error, the observer passed to tap is called:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
tap(
[](int v){printf("Tap - OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("Tap - OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("Tap - OnCompleted\n");});
values.
[](int v){printf("Subscribe - OnNext: %d\n", v);},
[](std::exception_ptr ep){
printf("Subscribe - OnError: %s\n", rxu::what(ep).c_str());
},
[](){printf("Subscribe - OnCompleted\n");});
Tap - OnNext: 1
Subscribe - OnNext: 1
Tap - OnNext: 2
Subscribe - OnNext: 2
Tap - OnNext: 3
Subscribe - OnNext: 3
Tap - OnError: Error from source
Subscribe - OnError: Error from source
template<class... AN>
auto rxcpp::operators::time_interval ( AN &&...  an) -> operator_factory<time_interval_tag, AN...>

Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable. The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
coordinationthe scheduler for time intervals.
Returns
Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
Sample Code
typedef rxcpp::schedulers::scheduler::clock_type::time_point::duration duration_type;
using namespace std::chrono;
auto values = rxcpp::observable<>::interval(milliseconds(100))
.time_interval()
.take(3);
values.
[&](duration_type v) {
long long int ms = duration_cast<milliseconds>(v).count();
printf("OnNext: @%lldms\n", ms);
},
[](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[]() { printf("OnCompleted\n"); });
OnNext: @0ms
OnNext: @100ms
OnNext: @100ms
OnCompleted
template<class... AN>
auto rxcpp::operators::timeout ( AN &&...  an) -> operator_factory<timeout_tag, AN...>

Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.

Template Parameters
Durationthe type of time interval.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time wait for another item from the source observable.
coordinationthe scheduler to manage timeout for each event (optional).
Returns
Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable.
Sample Code
using namespace std::chrono;
auto values = rxcpp::observable<>::interval(milliseconds(100))
.take(3)
.concat(rxcpp::observable<>::interval(milliseconds(500)))
.timeout(milliseconds(200));
values.
[](long v) { printf("OnNext: %ld\n", v); },
[](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const rxcpp::timeout_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[]() { printf("OnCompleted\n"); });
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 1
OnError: timeout has occurred
template<class... AN>
auto rxcpp::operators::timestamp ( AN &&...  an) -> operator_factory<timestamp_tag, AN...>

Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.

Template Parameters
Coordinationthe type of the scheduler (optional).
Parameters
coordinationthe scheduler to manage timeout for each event (optional).
Returns
Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
Sample Code
typedef rxcpp::schedulers::scheduler::clock_type::time_point time_point;
using namespace std::chrono;
auto values = rxcpp::observable<>::interval(milliseconds(100))
.timestamp()
.take(3);
time_point start = rxcpp::identity_current_thread().now();
values.
[&](std::pair<long, time_point> v) {
long long int ms = duration_cast<milliseconds>(v.second - start).count();
printf("OnNext: %ld @%lldms\n", v.first, ms);
},
[](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[]() { printf("OnCompleted\n"); });
OnNext: 1 @0ms
OnNext: 2 @100ms
OnNext: 3 @200ms
OnCompleted
template<class... AN>
auto rxcpp::operators::transform ( AN &&...  an) -> operator_factory<map_tag, AN...>

For each item from this observable use Selector to produce an item to emit from the new observable that is returned.

Template Parameters
Selectorthe type of the transforming function
Parameters
sthe selector function
Returns
Observable that emits the items from the source observable, transformed by the specified function.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).
map([](int v){
return 2 * v;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2
OnNext: 4
OnNext: 6
OnCompleted
template<class... AN>
auto rxcpp::operators::window ( AN &&...  an) -> operator_factory<window_tag, AN...>

Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable.

Parameters
countthe maximum size of each window before it should be completed
skiphow many items need to be skipped before starting a new window
Returns
Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an Observable that emits windows every skip items containing at most count items from the source observable.
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::range(1, 7).window(2, 3);
values.
[&counter](rxcpp::observable<int> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](int v){printf("[window %d] OnNext: %d\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 4
[window 1] OnNext: 5
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 7
[window 2] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::range(1, 5).window(2);
values.
[&counter](rxcpp::observable<int> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](int v){printf("[window %d] OnNext: %d\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 5
[window 2] OnCompleted
template<class... AN>
auto rxcpp::operators::window_toggle ( AN &&...  an) -> operator_factory<window_toggle_tag, AN...>

Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.

Template Parameters
Openingsobservable<OT>
ClosingSelectora function of type observable<CT>(OT)
Coordinationthe type of the scheduler (optional).
Parameters
openseach value from this observable opens a new window.
closesthis function is called for each opened window and returns an observable. the first value from the returned observable will close the window.
coordinationthe scheduler for the windows (optional).
Returns
Observable that emits an observable for each opened window.
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
rxcpp::observable<>::interval(std::chrono::milliseconds(4)),
[](long){
return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1);
},
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
rxcpp::observable<>::interval(std::chrono::milliseconds(4)),
[](long){
return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1);
});
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
template<class... AN>
auto rxcpp::operators::window_with_time ( AN &&...  an) -> operator_factory<window_with_time_tag, AN...>

Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.

Template Parameters
Durationthe type of time intervals.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each window collects items before it is completed.
skipthe period of time after which a new window will be created.
coordinationthe scheduler for the windows (optional).
Returns
Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable. If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
Sample Code
int counter = 0;
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 4
[window 1] OnNext: 5
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 7
[window 2] OnCompleted
Sample Code
int counter = 0;
auto period = std::chrono::milliseconds(4);
auto skip = std::chrono::milliseconds(6);
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 4
[window 1] OnNext: 5
[window 1] OnCompleted
[window 2] Create window
[window 2] OnNext: 7
[window 2] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
window_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
Sample Code
int counter = 0;
auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
take(7).
window_with_time(std::chrono::milliseconds(4));
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 1] Create window
[window 0] OnCompleted
[window 1] OnNext: 3
[window 1] OnNext: 4
[window 2] Create window
[window 1] OnCompleted
[window 2] OnNext: 5
[window 2] OnNext: 6
[window 3] Create window
[window 2] OnCompleted
[window 3] OnNext: 7
[window 3] OnCompleted
template<class... AN>
auto rxcpp::operators::window_with_time_or_count ( AN &&...  an) -> operator_factory<window_with_time_or_count_tag, AN...>

Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.

Template Parameters
Durationthe type of time intervals.
Coordinationthe type of the scheduler (optional).
Parameters
periodthe period of time each window collects items before it is completed and replaced with a new window.
countthe maximum size of each window before it is completed and new window is created.
coordinationthe scheduler for the windows (optional).
Returns
Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
Sample Code
int counter = 0;
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
window_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 3
[window 1] OnCompleted
[window 2] Create window
[window 2] OnCompleted
[window 3] Create window
[window 3] OnNext: 1
[window 3] OnCompleted
Sample Code
int counter = 0;
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
concat(int2).
window_with_time_or_count(std::chrono::milliseconds(20), 2);
values.
[&counter](rxcpp::observable<long> v){
int id = counter++;
printf("[window %d] Create window\n", id);
[id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
[id](){printf("[window %d] OnCompleted\n", id);});
});
[window 0] Create window
[window 0] OnNext: 1
[window 0] OnNext: 2
[window 0] OnCompleted
[window 1] Create window
[window 1] OnNext: 3
[window 1] OnCompleted
[window 2] Create window
[window 2] OnCompleted
[window 3] Create window
[window 3] OnNext: 1
[window 3] OnCompleted
template<class... AN>
auto rxcpp::operators::with_latest_from ( AN &&...  an) -> operator_factory<with_latest_from_tag, AN...>

For each item from the first observable select the latest value from all the observables to emit from the new observable that is returned.

Template Parameters
ANtypes of scheduler (optional), aggregate function (optional), and source observables
Parameters
anscheduler (optional), aggregation function (optional), and source observables
Returns
Observable that emits items that are the result of combining the items emitted by the source observables.

If scheduler is omitted, identity_current_thread is used.

If aggregation function is omitted, the resulting observable returns tuples of emitted items.

Sample Code

Neither scheduler nor aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.with_latest_from(o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("OnCompleted\n");});
OnNext: 2, 1, 1
OnNext: 3, 2, 1
OnNext: 4, 2, 2
OnNext: 5, 3, 2
OnNext: 6, 4, 2
OnCompleted

Only scheduler is present:

printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) {
printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto values = o1.with_latest_from(thr, o2, o3);
values.
take(5).
[](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source1 OnNext: 1
[thread 47481267428736] Source2 OnNext: 1
[thread 47481267428736] Source3 OnNext: 1
[thread 47481267428736] Source1 OnNext: 2
[thread 47481294776064] OnNext: 2, 1, 1
[thread 47481267428736] Source2 OnNext: 2
[thread 47481267428736] Source1 OnNext: 3
[thread 47481294776064] OnNext: 3, 2, 1
[thread 47481267428736] Source3 OnNext: 2
[thread 47481267428736] Source1 OnNext: 4
[thread 47481267428736] Source2 OnNext: 3
[thread 47481294776064] OnNext: 4, 2, 2
[thread 47481267428736] Source1 OnNext: 5
[thread 47481294776064] OnNext: 5, 3, 2
[thread 47481267428736] Source2 OnNext: 4
[thread 47481267428736] Source1 OnNext: 6
[thread 47481267428736] Source3 OnNext: 3
[thread 47481294776064] OnNext: 6, 4, 2
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task

Only aggregation function is present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.with_latest_from(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 211
OnNext: 321
OnNext: 422
OnNext: 532
OnNext: 642
OnCompleted

Both scheduler and aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
auto values = o1.with_latest_from(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(5).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 211
OnNext: 321
OnNext: 422
OnNext: 532
OnNext: 642
OnCompleted
template<class... AN>
auto rxcpp::operators::zip ( AN &&...  an) -> operator_factory<zip_tag, AN...>

Bring by one item from all given observables and select a value to emit from the new observable that is returned.

Template Parameters
ANtypes of scheduler (optional), aggregate function (optional), and source observables
Parameters
anscheduler (optional), aggregation function (optional), and source observables
Returns
Observable that emits the result of combining the items emitted and brought by one from each of the source observables.

If scheduler is omitted, identity_current_thread is used.

If aggregation function is omitted, the resulting observable returns tuples of emitted items.

Sample Code

Neither scheduler nor aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto values = o1.zip(o2, o3);
values.
take(3).
[](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("OnCompleted\n");});
OnNext: 1, 1, 1
OnNext: 2, 2, 2
OnNext: 3, 3, 3
OnCompleted

Only scheduler is present:

printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) {
printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
return v;
});
auto values = o1.zip(thr, o2, o3);
values.
take(3).
[](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481267428736] Source1 OnNext: 1
[thread 47481267428736] Source2 OnNext: 1
[thread 47481267428736] Source3 OnNext: 1
[thread 47481298978560] OnNext: 1, 1, 1
[thread 47481267428736] Source1 OnNext: 2
[thread 47481267428736] Source1 OnNext: 3
[thread 47481267428736] Source2 OnNext: 2
[thread 47481267428736] Source1 OnNext: 4
[thread 47481267428736] Source3 OnNext: 2
[thread 47481298978560] OnNext: 2, 2, 2
[thread 47481267428736] Source1 OnNext: 5
[thread 47481267428736] Source2 OnNext: 3
[thread 47481267428736] Source1 OnNext: 6
[thread 47481267428736] Source1 OnNext: 7
[thread 47481267428736] Source2 OnNext: 4
[thread 47481267428736] Source3 OnNext: 3
[thread 47481298978560] OnNext: 3, 3, 3
[thread 47481298978560] OnCompleted
[thread 47481267428736] Finish task

Only aggregation function is present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto values = o1 | rxcpp::operators::zip(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(3).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 222
OnNext: 333
OnCompleted

Both scheduler and aggregation function are present:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
auto values = o1.zip(
[](int v1, int v2, int v3) {
return 100 * v1 + 10 * v2 + v3;
},
o2, o3);
values.
take(3).
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 111
OnNext: 222
OnNext: 333
OnCompleted

Variable Documentation

auto rxcpp::operators::AN
Initial value:
{
return operator_factory<finally_tag, AN...>(std::make_tuple(std::forward<AN>(an)...))
auto AN
Definition: rx-finally.hpp:105