RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-scan.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
22 #if !defined(RXCPP_OPERATORS_RX_SCAN_HPP)
23 #define RXCPP_OPERATORS_RX_SCAN_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct scan_invalid_arguments {};
35 
36 template<class... AN>
37 struct scan_invalid : public rxo::operator_base<scan_invalid_arguments<AN...>> {
38  using type = observable<scan_invalid_arguments<AN...>, scan_invalid<AN...>>;
39 };
40 template<class... AN>
41 using scan_invalid_t = typename scan_invalid<AN...>::type;
42 
43 template<class T, class Observable, class Accumulator, class Seed>
44 struct scan : public operator_base<rxu::decay_t<Seed>>
45 {
46  typedef rxu::decay_t<Observable> source_type;
47  typedef rxu::decay_t<Accumulator> accumulator_type;
48  typedef rxu::decay_t<Seed> seed_type;
49 
50  struct scan_initial_type
51  {
52  scan_initial_type(source_type o, accumulator_type a, seed_type s)
53  : source(std::move(o))
54  , accumulator(std::move(a))
55  , seed(s)
56  {
57  }
58  source_type source;
59  accumulator_type accumulator;
60  seed_type seed;
61  };
62  scan_initial_type initial;
63 
64  scan(source_type o, accumulator_type a, seed_type s)
65  : initial(std::move(o), a, s)
66  {
67  }
68 
69  template<class Subscriber>
70  void on_subscribe(Subscriber o) const {
71  struct scan_state_type
72  : public scan_initial_type
73  , public std::enable_shared_from_this<scan_state_type>
74  {
75  scan_state_type(scan_initial_type i, Subscriber scrbr)
76  : scan_initial_type(i)
77  , result(scan_initial_type::seed)
78  , out(std::move(scrbr))
79  {
80  }
81  seed_type result;
82  Subscriber out;
83  };
84  auto state = std::make_shared<scan_state_type>(initial, std::move(o));
85  state->source.subscribe(
86  state->out,
87  // on_next
88  [state](T t) {
89  state->result = state->accumulator(state->result, t);
90  state->out.on_next(state->result);
91  },
92  // on_error
93  [state](std::exception_ptr e) {
94  state->out.on_error(e);
95  },
96  // on_completed
97  [state]() {
98  state->out.on_completed();
99  }
100  );
101  }
102 };
103 
104 }
105 
108 template<class... AN>
109 auto scan(AN&&... an)
110  -> operator_factory<scan_tag, AN...> {
111  return operator_factory<scan_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
112 }
113 
114 }
115 
116 template<>
118 {
119  template<class Observable, class Seed, class Accumulator,
120  class Enabled = rxu::enable_if_all_true_type_t<
123  class SourceValue = rxu::value_type_t<Observable>,
124  class Scan = rxo::detail::scan<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<Seed>>,
125  class Value = rxu::value_type_t<Scan>,
126  class Result = observable<Value, Scan>>
127  static Result member(Observable&& o, Seed s, Accumulator&& a) {
128  return Result(Scan(std::forward<Observable>(o), std::forward<Accumulator>(a), s));
129  }
130 
131  template<class... AN>
132  static operators::detail::scan_invalid_t<AN...> member(AN...) {
133  std::terminate();
134  return {};
135  static_assert(sizeof...(AN) == 10000, "scan takes (Seed, Accumulator); Accumulator must be a function with the signature Seed(Seed, T)");
136  }
137 };
138 
139 }
140 
141 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o, Seed s, Accumulator &&a)
Definition: rx-scan.hpp:127
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static operators::detail::scan_invalid_t< AN... > member(AN...)
Definition: rx-scan.hpp:132
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:353
auto scan(AN &&...an) -> operator_factory< scan_tag, AN... >
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-scan.hpp:109
Definition: rx-predef.hpp:310
Definition: rx-predef.hpp:177