RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-reduce.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
46 #if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP)
47 #define RXCPP_OPERATORS_RX_REDUCE_HPP
48 
49 #include "../rx-includes.hpp"
50 
51 namespace rxcpp {
52 
53 namespace operators {
54 
55 namespace detail {
56 
57 template<class... AN>
58 struct reduce_invalid_arguments {};
59 
60 template<class... AN>
61 struct reduce_invalid : public rxo::operator_base<reduce_invalid_arguments<AN...>> {
62  using type = observable<reduce_invalid_arguments<AN...>, reduce_invalid<AN...>>;
63 };
64 template<class... AN>
65 using reduce_invalid_t = typename reduce_invalid<AN...>::type;
66 
67 template<class Seed, class ResultSelector>
68 struct is_result_function_for {
69 
70  typedef rxu::decay_t<ResultSelector> result_selector_type;
71  typedef rxu::decay_t<Seed> seed_type;
72 
73  struct tag_not_valid {};
74 
75  template<class CS, class CRS>
76  static auto check(int) -> decltype((*(CRS*)nullptr)(*(CS*)nullptr));
77  template<class CS, class CRS>
78  static tag_not_valid check(...);
79 
80  typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
81  static const bool value = !std::is_same<type, tag_not_valid>::value;
82 };
83 
84 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
85 struct reduce_traits
86 {
87  typedef rxu::decay_t<Observable> source_type;
88  typedef rxu::decay_t<Accumulator> accumulator_type;
89  typedef rxu::decay_t<ResultSelector> result_selector_type;
90  typedef rxu::decay_t<Seed> seed_type;
91 
92  typedef T source_value_type;
93 
94  typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
95 };
96 
97 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
98 struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>>
99 {
100  typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type;
101  typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits;
102 
103  typedef typename traits::source_type source_type;
104  typedef typename traits::accumulator_type accumulator_type;
105  typedef typename traits::result_selector_type result_selector_type;
106  typedef typename traits::seed_type seed_type;
107 
108  typedef typename traits::source_value_type source_value_type;
109  typedef typename traits::value_type value_type;
110 
111  struct reduce_initial_type
112  {
113  ~reduce_initial_type()
114  {
115  }
116  reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
117  : source(std::move(o))
118  , accumulator(std::move(a))
119  , result_selector(std::move(rs))
120  , seed(std::move(s))
121  {
122  }
123  source_type source;
124  accumulator_type accumulator;
125  result_selector_type result_selector;
126  seed_type seed;
127 
128  private:
129  reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
130  };
131  reduce_initial_type initial;
132 
133  ~reduce()
134  {
135  }
136  reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
137  : initial(std::move(o), std::move(a), std::move(rs), std::move(s))
138  {
139  }
140  template<class Subscriber>
141  void on_subscribe(Subscriber o) const {
142  struct reduce_state_type
143  : public reduce_initial_type
144  , public std::enable_shared_from_this<reduce_state_type>
145  {
146  reduce_state_type(reduce_initial_type i, Subscriber scrbr)
147  : reduce_initial_type(i)
148  , source(i.source)
149  , current(reduce_initial_type::seed)
150  , out(std::move(scrbr))
151  {
152  }
153  source_type source;
154  seed_type current;
155  Subscriber out;
156 
157  private:
158  reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE;
159  };
160  auto state = std::make_shared<reduce_state_type>(initial, std::move(o));
161  state->source.subscribe(
162  state->out,
163  // on_next
164  [state](T t) {
165  seed_type next = state->accumulator(std::move(state->current), std::move(t));
166  state->current = std::move(next);
167  },
168  // on_error
169  [state](std::exception_ptr e) {
170  state->out.on_error(e);
171  },
172  // on_completed
173  [state]() {
174  auto result = on_exception(
175  [&](){return state->result_selector(std::move(state->current));},
176  state->out);
177  if (result.empty()) {
178  return;
179  }
180  state->out.on_next(std::move(result.get()));
181  state->out.on_completed();
182  }
183  );
184  }
185 private:
186  reduce& operator=(reduce o) RXCPP_DELETE;
187 };
188 
189 template<class T>
190 struct initialize_seeder {
191  typedef T seed_type;
192  static seed_type seed() {
193  return seed_type{};
194  }
195 };
196 
197 template<class T>
198 struct average {
199  struct seed_type
200  {
201  seed_type()
202  : value()
203  , count(0)
204  {
205  }
206  rxu::maybe<T> value;
207  int count;
208  rxu::detail::maybe<double> stage;
209  };
210  static seed_type seed() {
211  return seed_type{};
212  }
213  template<class U>
214  seed_type operator()(seed_type a, U&& v) {
215  if (a.count != 0 &&
216  (a.count == std::numeric_limits<int>::max() ||
217  ((v > 0) && (*(a.value) > (std::numeric_limits<T>::max() - v))) ||
218  ((v < 0) && (*(a.value) < (std::numeric_limits<T>::min() - v))))) {
219  // would overflow, calc existing and reset for next batch
220  // this will add error to the final result, but the alternative
221  // is to fail on overflow
222  double avg = static_cast<double>(*(a.value)) / a.count;
223  if (!a.stage.empty()) {
224  a.stage.reset((*a.stage + avg) / 2);
225  } else {
226  a.stage.reset(avg);
227  }
228  a.value.reset(std::forward<U>(v));
229  a.count = 1;
230  } else if (a.value.empty()) {
231  a.value.reset(std::forward<U>(v));
232  a.count = 1;
233  } else {
234  *(a.value) += v;
235  ++a.count;
236  }
237  return a;
238  }
239  double operator()(seed_type a) {
240  if (!a.value.empty()) {
241  double avg = static_cast<double>(*(a.value)) / a.count;
242  if (!a.stage.empty()) {
243  avg = (*a.stage + avg) / 2;
244  }
245  return avg;
246  }
247  throw rxcpp::empty_error("average() requires a stream with at least one value");
248  }
249 };
250 
251 template<class T>
252 struct sum {
253  typedef rxu::maybe<T> seed_type;
254  static seed_type seed() {
255  return seed_type();
256  }
257  template<class U>
258  seed_type operator()(seed_type a, U&& v) const {
259  if (a.empty())
260  a.reset(std::forward<U>(v));
261  else
262  *a = *a + v;
263  return a;
264  }
265  T operator()(seed_type a) const {
266  if (a.empty())
267  throw rxcpp::empty_error("sum() requires a stream with at least one value");
268  return *a;
269  }
270 };
271 
272 template<class T>
273 struct max {
274  typedef rxu::maybe<T> seed_type;
275  static seed_type seed() {
276  return seed_type();
277  }
278  template<class U>
279  seed_type operator()(seed_type a, U&& v) {
280  if (a.empty() || *a < v)
281  a.reset(std::forward<U>(v));
282  return a;
283  }
284  T operator()(seed_type a) {
285  if (a.empty())
286  throw rxcpp::empty_error("max() requires a stream with at least one value");
287  return *a;
288  }
289 };
290 
291 template<class T>
292 struct min {
293  typedef rxu::maybe<T> seed_type;
294  static seed_type seed() {
295  return seed_type();
296  }
297  template<class U>
298  seed_type operator()(seed_type a, U&& v) {
299  if (a.empty() || v < *a)
300  a.reset(std::forward<U>(v));
301  return a;
302  }
303  T operator()(seed_type a) {
304  if (a.empty())
305  throw rxcpp::empty_error("min() requires a stream with at least one value");
306  return *a;
307  }
308 };
309 
310 template<class T>
311 struct first {
312  using seed_type = rxu::maybe<T>;
313  static seed_type seed() {
314  return seed_type();
315  }
316  template<class U>
317  seed_type operator()(seed_type a, U&& v) {
318  a.reset(std::forward<U>(v));
319  return a;
320  }
321  T operator()(seed_type a) {
322  if (a.empty()) {
323  throw rxcpp::empty_error("first() requires a stream with at least one value");
324  }
325  return *a;
326  }
327 };
328 
329 template<class T>
330 struct last {
331  using seed_type = rxu::maybe<T>;
332  static seed_type seed() {
333  return seed_type();
334  }
335  template<class U>
336  seed_type operator()(seed_type a, U&& v) {
337  a.reset(std::forward<U>(v));
338  return a;
339  }
340  T operator()(seed_type a) {
341  if (a.empty()) {
342  throw rxcpp::empty_error("last() requires a stream with at least one value");
343  }
344  return *a;
345  }
346 };
347 
348 }
349 
352 template<class... AN>
353 auto reduce(AN&&... an)
354  -> operator_factory<reduce_tag, AN...> {
355  return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
356 }
357 
360 template<class... AN>
361 auto accumulate(AN&&... an)
362  -> operator_factory<reduce_tag, AN...> {
363  return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
364 }
365 
378 inline auto first()
380  return operator_factory<first_tag>(std::tuple<>{});
381 }
382 
395 inline auto last()
397  return operator_factory<last_tag>(std::tuple<>{});
398 }
399 
412 inline auto count()
414  return operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>(std::make_tuple(0, rxu::count(), rxu::take_at<0>()));
415 }
416 
433 inline auto average()
435  return operator_factory<average_tag>(std::tuple<>{});
436 }
437 
454 inline auto sum()
456  return operator_factory<sum_tag>(std::tuple<>{});
457 }
458 
475 inline auto min()
477  return operator_factory<min_tag>(std::tuple<>{});
478 }
479 
496 inline auto max()
498  return operator_factory<max_tag>(std::tuple<>{});
499 }
500 
501 }
502 
503 template<>
505 {
506 
507  template<class Observable, class Seed, class Accumulator, class ResultSelector,
508  class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
509  class Value = rxu::value_type_t<Reduce>,
510  class Result = observable<Value, Reduce>>
511  static Result member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r)
512  {
513  return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s)));
514  }
515 
516  template<class Observable, class Seed, class Accumulator,
517  class ResultSelector=rxu::detail::take_at<0>,
518  class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
519  class Value = rxu::value_type_t<Reduce>,
520  class Result = observable<Value, Reduce>>
521  static Result member(Observable&& o, Seed&& s, Accumulator&& a)
522  {
523  return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s)));
524  }
525 
526  template<class... AN>
527  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
528  std::terminate();
529  return {};
530  static_assert(sizeof...(AN) == 10000, "reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue");
531  }
532 };
533 
534 template<>
536 {
537  template<class Observable,
538  class SValue = rxu::value_type_t<Observable>,
539  class Operation = operators::detail::first<SValue>,
540  class Seed = decltype(Operation::seed()),
541  class Accumulator = Operation,
542  class ResultSelector = Operation,
543  class TakeOne = decltype(((rxu::decay_t<Observable>*)nullptr)->take(1)),
544  class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<TakeOne>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
545  class RValue = rxu::value_type_t<Reduce>,
546  class Result = observable<RValue, Reduce>>
547  static Result member(Observable&& o)
548  {
549  return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed()));
550  }
551 
552  template<class... AN>
553  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
554  std::terminate();
555  return {};
556  static_assert(sizeof...(AN) == 10000, "first does not support Observable::value_type");
557  }
558 };
559 
560 template<>
562 {
563  template<class Observable,
564  class SValue = rxu::value_type_t<Observable>,
565  class Operation = operators::detail::last<SValue>,
566  class Seed = decltype(Operation::seed()),
567  class Accumulator = Operation,
568  class ResultSelector = Operation,
569  class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
570  class RValue = rxu::value_type_t<Reduce>,
571  class Result = observable<RValue, Reduce>>
572  static Result member(Observable&& o)
573  {
574  return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
575  }
576 
577  template<class... AN>
578  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
579  std::terminate();
580  return {};
581  static_assert(sizeof...(AN) == 10000, "last does not support Observable::value_type");
582  }
583 };
584 
585 template<>
587 {
588  template<class Observable,
589  class SValue = rxu::value_type_t<Observable>,
590  class Operation = operators::detail::sum<SValue>,
591  class Seed = decltype(Operation::seed()),
592  class Accumulator = Operation,
593  class ResultSelector = Operation,
594  class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
595  class RValue = rxu::value_type_t<Reduce>,
596  class Result = observable<RValue, Reduce>>
597  static Result member(Observable&& o)
598  {
599  return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
600  }
601 
602  template<class... AN>
603  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
604  std::terminate();
605  return {};
606  static_assert(sizeof...(AN) == 10000, "sum does not support Observable::value_type");
607  }
608 };
609 
610 template<>
612 {
613  template<class Observable,
614  class SValue = rxu::value_type_t<Observable>,
615  class Operation = operators::detail::average<SValue>,
616  class Seed = decltype(Operation::seed()),
617  class Accumulator = Operation,
618  class ResultSelector = Operation,
619  class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
620  class RValue = rxu::value_type_t<Reduce>,
621  class Result = observable<RValue, Reduce>>
622  static Result member(Observable&& o)
623  {
624  return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
625  }
626 
627  template<class... AN>
628  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
629  std::terminate();
630  return {};
631  static_assert(sizeof...(AN) == 10000, "average does not support Observable::value_type");
632  }
633 };
634 
635 template<>
637 {
638  template<class Observable,
639  class SValue = rxu::value_type_t<Observable>,
640  class Operation = operators::detail::max<SValue>,
641  class Seed = decltype(Operation::seed()),
642  class Accumulator = Operation,
643  class ResultSelector = Operation,
644  class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
645  class RValue = rxu::value_type_t<Reduce>,
646  class Result = observable<RValue, Reduce>>
647  static Result member(Observable&& o)
648  {
649  return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
650  }
651 
652  template<class... AN>
653  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
654  std::terminate();
655  return {};
656  static_assert(sizeof...(AN) == 10000, "max does not support Observable::value_type");
657  }
658 };
659 
660 template<>
662 {
663  template<class Observable,
664  class SValue = rxu::value_type_t<Observable>,
665  class Operation = operators::detail::min<SValue>,
666  class Seed = decltype(Operation::seed()),
667  class Accumulator = Operation,
668  class ResultSelector = Operation,
669  class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
670  class RValue = rxu::value_type_t<Reduce>,
671  class Result = observable<RValue, Reduce>>
672  static Result member(Observable&& o)
673  {
674  return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
675  }
676 
677  template<class... AN>
678  static operators::detail::reduce_invalid_t<AN...> member(AN...) {
679  std::terminate();
680  return {};
681  static_assert(sizeof...(AN) == 10000, "min does not support Observable::value_type");
682  }
683 };
684 
685 }
686 
687 #endif
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:578
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:653
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:527
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o)
Definition: rx-reduce.hpp:572
Definition: rx-operators.hpp:283
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:290
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:603
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
Definition: rx-operators.hpp:298
Definition: rx-operators.hpp:296
Definition: rx-util.hpp:404
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
static Result member(Observable &&o, Seed &&s, Accumulator &&a, ResultSelector &&r)
Definition: rx-reduce.hpp:511
static Result member(Observable &&o)
Definition: rx-reduce.hpp:647
Definition: rx-operators.hpp:297
static Result member(Observable &&o, Seed &&s, Accumulator &&a)
Definition: rx-reduce.hpp:521
auto sum() -> operator_factory< sum_tag >
For each item from this observable reduce it by adding to the previous items.
Definition: rx-reduce.hpp:454
Definition: rx-operators.hpp:301
static Result member(Observable &&o)
Definition: rx-reduce.hpp:672
auto min() -> operator_factory< min_tag >
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-reduce.hpp:475
auto average() -> operator_factory< average_tag >
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-reduce.hpp:433
static Result member(Observable &&o)
Definition: rx-reduce.hpp:547
auto take(AN &&...an) -> operator_factory< take_tag, AN... >
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-take.hpp:133
static Result member(Observable &&o)
Definition: rx-reduce.hpp:597
auto reduce(AN &&...an) -> operator_factory< reduce_tag, AN... >
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-reduce.hpp:353
static Result member(Observable &&o)
Definition: rx-reduce.hpp:622
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:553
Definition: rx-operators.hpp:300
auto accumulate(AN &&...an) -> operator_factory< reduce_tag, AN... >
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-reduce.hpp:361
Definition: rx-operators.hpp:299
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:628
static operators::detail::reduce_invalid_t< AN... > member(AN...)
Definition: rx-reduce.hpp:678