RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Classes | Namespaces | Macros | Functions
rx-reduce.hpp File Reference

For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...

#include "../rx-includes.hpp"
Include dependency graph for rx-reduce.hpp:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  rxcpp::member_overload< reduce_tag >
 
struct  rxcpp::member_overload< first_tag >
 
struct  rxcpp::member_overload< last_tag >
 
struct  rxcpp::member_overload< sum_tag >
 
struct  rxcpp::member_overload< average_tag >
 
struct  rxcpp::member_overload< max_tag >
 
struct  rxcpp::member_overload< min_tag >
 

Namespaces

 rxcpp
 
 rxcpp::operators
 

Macros

#define RXCPP_OPERATORS_RX_REDUCE_HPP
 

Functions

template<class... AN>
auto rxcpp::operators::reduce (AN &&...an) -> operator_factory< reduce_tag, AN... >
 For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...
 
template<class... AN>
auto rxcpp::operators::accumulate (AN &&...an) -> operator_factory< reduce_tag, AN... >
 For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. More...
 
auto rxcpp::operators::first () -> operator_factory< first_tag >
 For each item from this observable reduce it by sending only the first item. More...
 
auto rxcpp::operators::last () -> operator_factory< last_tag >
 For each item from this observable reduce it by sending only the last item. More...
 
auto rxcpp::operators::count () -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
 For each item from this observable reduce it by incrementing a count. More...
 
auto rxcpp::operators::average () -> operator_factory< average_tag >
 For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. More...
 
auto rxcpp::operators::sum () -> operator_factory< sum_tag >
 For each item from this observable reduce it by adding to the previous items. More...
 
auto rxcpp::operators::min () -> operator_factory< min_tag >
 For each item from this observable reduce it by taking the min value of the previous items. More...
 
auto rxcpp::operators::max () -> operator_factory< max_tag >
 For each item from this observable reduce it by taking the max value of the previous items. More...
 

Detailed Description

For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.

Template Parameters
Seedthe type of the initial value for the accumulator
Accumulatorthe type of the data accumulating function
ResultSelectorthe type of the result producing function
Parameters
seedthe initial value for the accumulator
aan accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
rsa result producing function that makes the final value from the last accumulator call result
Returns
An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.

Some basic reduce-type operators have already been implemented:

Sample Code
Geometric mean of source values:
auto values = rxcpp::observable<>::range(1, 7).
std::make_pair(0, 1.0),
[](std::pair<int, double> seed, int v){
seed.first += 1;
seed.second *= v;
return seed;
},
[](std::pair<int, double> res){
return std::pow(res.second, 1.0 / res.first);
});
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3.380015
OnCompleted
If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
auto values = rxcpp::observable<>::empty<int>().
1,
[](int,int){return 0;},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
If the accumulator raises an exception, it is returned by the resulting observable in on_error:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){
if (v == 2)
throw std::runtime_error("Exception from accumulator");
return seed;
},
[](int res){return res;});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from accumulator
The same for exceptions raised by the result selector:
auto values = rxcpp::observable<>::range(1, 3).
0,
[](int seed, int v){return seed + v;},
[](int res){
throw std::runtime_error("Exception from result selector");
return res;
});
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Exception from result selector

Macro Definition Documentation

#define RXCPP_OPERATORS_RX_REDUCE_HPP