46 #if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP) 47 #define RXCPP_OPERATORS_RX_REDUCE_HPP 49 #include "../rx-includes.hpp" 58 struct reduce_invalid_arguments {};
61 struct reduce_invalid :
public rxo::operator_base<reduce_invalid_arguments<AN...>> {
62 using type = observable<reduce_invalid_arguments<
AN...>, reduce_invalid<
AN...>>;
65 using reduce_invalid_t =
typename reduce_invalid<
AN...>::type;
67 template<
class Seed,
class ResultSelector>
68 struct is_result_function_for {
70 typedef rxu::decay_t<ResultSelector> result_selector_type;
71 typedef rxu::decay_t<Seed> seed_type;
73 struct tag_not_valid {};
75 template<
class CS,
class CRS>
76 static auto check(
int) -> decltype((*(CRS*)
nullptr)(*(CS*)
nullptr));
77 template<
class CS,
class CRS>
78 static tag_not_valid check(...);
80 typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
81 static const bool value = !std::is_same<type, tag_not_valid>::value;
84 template<
class T,
class Observable,
class Accumulator,
class ResultSelector,
class Seed>
87 typedef rxu::decay_t<Observable> source_type;
88 typedef rxu::decay_t<Accumulator> accumulator_type;
89 typedef rxu::decay_t<ResultSelector> result_selector_type;
90 typedef rxu::decay_t<Seed> seed_type;
92 typedef T source_value_type;
94 typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
97 template<
class T,
class Observable,
class Accumulator,
class ResultSelector,
class Seed>
98 struct reduce :
public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>>
100 typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type;
101 typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits;
103 typedef typename traits::source_type source_type;
104 typedef typename traits::accumulator_type accumulator_type;
105 typedef typename traits::result_selector_type result_selector_type;
106 typedef typename traits::seed_type seed_type;
108 typedef typename traits::source_value_type source_value_type;
109 typedef typename traits::value_type value_type;
111 struct reduce_initial_type
113 ~reduce_initial_type()
116 reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
117 : source(std::move(o))
118 , accumulator(std::move(a))
119 , result_selector(std::move(rs))
124 accumulator_type accumulator;
125 result_selector_type result_selector;
129 reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
131 reduce_initial_type initial;
136 reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
137 : initial(std::move(o), std::move(a), std::move(rs), std::move(s))
140 template<
class Subscriber>
141 void on_subscribe(Subscriber o)
const {
142 struct reduce_state_type
143 :
public reduce_initial_type
144 ,
public std::enable_shared_from_this<reduce_state_type>
146 reduce_state_type(reduce_initial_type i, Subscriber scrbr)
147 : reduce_initial_type(i)
149 , current(reduce_initial_type::seed)
150 , out(std::move(scrbr))
158 reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE;
160 auto state = std::make_shared<reduce_state_type>(initial, std::move(o));
161 state->source.subscribe(
165 seed_type next = state->accumulator(std::move(state->current), std::move(t));
166 state->current = std::move(next);
169 [state](std::exception_ptr e) {
170 state->out.on_error(e);
174 auto result = on_exception(
175 [&](){return state->result_selector(std::move(state->current));},
177 if (result.empty()) {
180 state->out.on_next(std::move(result.get()));
181 state->out.on_completed();
190 struct initialize_seeder {
192 static seed_type seed() {
208 rxu::detail::maybe<double> stage;
210 static seed_type seed() {
214 seed_type operator()(seed_type a, U&& v) {
222 double avg =
static_cast<double>(*(a.value)) / a.count;
223 if (!a.stage.empty()) {
224 a.stage.reset((*a.stage + avg) / 2);
228 a.value.reset(std::forward<U>(v));
230 }
else if (a.value.empty()) {
231 a.value.reset(std::forward<U>(v));
239 double operator()(seed_type a) {
240 if (!a.value.empty()) {
241 double avg =
static_cast<double>(*(a.value)) / a.count;
242 if (!a.stage.empty()) {
243 avg = (*a.stage + avg) / 2;
253 typedef rxu::maybe<T> seed_type;
254 static seed_type seed() {
258 seed_type operator()(seed_type a, U&& v)
const {
260 a.reset(std::forward<U>(v));
265 T operator()(seed_type a)
const {
274 typedef rxu::maybe<T> seed_type;
275 static seed_type seed() {
279 seed_type operator()(seed_type a, U&& v) {
280 if (a.empty() || *a < v)
281 a.reset(std::forward<U>(v));
284 T operator()(seed_type a) {
293 typedef rxu::maybe<T> seed_type;
294 static seed_type seed() {
298 seed_type operator()(seed_type a, U&& v) {
299 if (a.empty() || v < *a)
300 a.reset(std::forward<U>(v));
303 T operator()(seed_type a) {
312 using seed_type = rxu::maybe<T>;
313 static seed_type seed() {
317 seed_type operator()(seed_type a, U&& v) {
318 a.reset(std::forward<U>(v));
321 T operator()(seed_type a) {
331 using seed_type = rxu::maybe<T>;
332 static seed_type seed() {
336 seed_type operator()(seed_type a, U&& v) {
337 a.reset(std::forward<U>(v));
340 T operator()(seed_type a) {
352 template<
class...
AN>
360 template<
class...
AN>
507 template<
class Observable,
class Seed,
class Accumulator,
class ResultSelector,
511 static Result
member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r)
513 return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s)));
516 template<
class Observable,
class Seed,
class Accumulator,
517 class ResultSelector=rxu::detail::take_at<0>,
518 class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
521 static Result
member(Observable&& o, Seed&& s, Accumulator&& a)
523 return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s)));
526 template<
class...
AN>
527 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
530 static_assert(
sizeof...(
AN) == 10000,
"reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue");
537 template<
class Observable,
539 class Operation = operators::detail::first<SValue>,
540 class Seed = decltype(Operation::seed()),
541 class Accumulator = Operation,
542 class ResultSelector = Operation,
549 return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed()));
552 template<
class...
AN>
553 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
556 static_assert(
sizeof...(
AN) == 10000,
"first does not support Observable::value_type");
563 template<
class Observable,
565 class Operation = operators::detail::last<SValue>,
566 class Seed = decltype(Operation::seed()),
567 class Accumulator = Operation,
568 class ResultSelector = Operation,
574 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
577 template<
class...
AN>
578 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
581 static_assert(
sizeof...(
AN) == 10000,
"last does not support Observable::value_type");
588 template<
class Observable,
590 class Operation = operators::detail::sum<SValue>,
591 class Seed = decltype(Operation::seed()),
592 class Accumulator = Operation,
593 class ResultSelector = Operation,
599 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
602 template<
class...
AN>
603 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
606 static_assert(
sizeof...(
AN) == 10000,
"sum does not support Observable::value_type");
613 template<
class Observable,
615 class Operation = operators::detail::average<SValue>,
616 class Seed = decltype(Operation::seed()),
617 class Accumulator = Operation,
618 class ResultSelector = Operation,
624 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
627 template<
class...
AN>
628 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
631 static_assert(
sizeof...(
AN) == 10000,
"average does not support Observable::value_type");
638 template<
class Observable,
640 class Operation = operators::detail::max<SValue>,
641 class Seed = decltype(Operation::seed()),
642 class Accumulator = Operation,
643 class ResultSelector = Operation,
649 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
652 template<
class...
AN>
653 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
656 static_assert(
sizeof...(
AN) == 10000,
"max does not support Observable::value_type");
663 template<
class Observable,
665 class Operation = operators::detail::min<SValue>,
666 class Seed = decltype(Operation::seed()),
667 class Accumulator = Operation,
668 class ResultSelector = Operation,
674 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
677 template<
class...
AN>
678 static operators::detail::reduce_invalid_t<
AN...>
member(
AN...) {
681 static_assert(
sizeof...(
AN) == 10000,
"min does not support Observable::value_type");
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:578
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:653
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:527
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o)
Definition: rx-reduce.hpp:572
Definition: rx-operators.hpp:283
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:290
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:603
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
Definition: rx-operators.hpp:298
Definition: rx-operators.hpp:296
Definition: rx-util.hpp:404
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
static Result member(Observable &&o, Seed &&s, Accumulator &&a, ResultSelector &&r)
Definition: rx-reduce.hpp:511
static Result member(Observable &&o)
Definition: rx-reduce.hpp:647
Definition: rx-operators.hpp:297
static Result member(Observable &&o, Seed &&s, Accumulator &&a)
Definition: rx-reduce.hpp:521
auto sum() -> operator_factory< sum_tag >
For each item from this observable reduce it by adding to the previous items.
Definition: rx-reduce.hpp:454
Definition: rx-operators.hpp:301
static Result member(Observable &&o)
Definition: rx-reduce.hpp:672
auto min() -> operator_factory< min_tag >
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-reduce.hpp:475
auto average() -> operator_factory< average_tag >
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-reduce.hpp:433
static Result member(Observable &&o)
Definition: rx-reduce.hpp:547
auto take(AN &&...an) -> operator_factory< take_tag, AN... >
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-take.hpp:133
static Result member(Observable &&o)
Definition: rx-reduce.hpp:597
auto reduce(AN &&...an) -> operator_factory< reduce_tag, AN... >
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-reduce.hpp:353
static Result member(Observable &&o)
Definition: rx-reduce.hpp:622
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:553
Definition: rx-operators.hpp:300
auto accumulate(AN &&...an) -> operator_factory< reduce_tag, AN... >
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-reduce.hpp:361
Definition: rx-operators.hpp:299
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:628
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:678