22 #if !defined(RXCPP_OPERATORS_RX_SCAN_HPP) 23 #define RXCPP_OPERATORS_RX_SCAN_HPP 25 #include "../rx-includes.hpp" 34 struct scan_invalid_arguments {};
37 struct scan_invalid :
public rxo::operator_base<scan_invalid_arguments<AN...>> {
38 using type = observable<scan_invalid_arguments<
AN...>, scan_invalid<
AN...>>;
41 using scan_invalid_t =
typename scan_invalid<
AN...>::type;
43 template<
class T,
class Observable,
class Accumulator,
class Seed>
44 struct scan :
public operator_base<rxu::decay_t<Seed>>
46 typedef rxu::decay_t<Observable> source_type;
47 typedef rxu::decay_t<Accumulator> accumulator_type;
48 typedef rxu::decay_t<Seed> seed_type;
50 struct scan_initial_type
52 scan_initial_type(source_type o, accumulator_type a, seed_type s)
53 : source(std::move(o))
54 , accumulator(std::move(a))
59 accumulator_type accumulator;
62 scan_initial_type initial;
64 scan(source_type o, accumulator_type a, seed_type s)
65 : initial(std::move(o), a, s)
69 template<
class Subscriber>
70 void on_subscribe(Subscriber o)
const {
71 struct scan_state_type
72 :
public scan_initial_type
73 ,
public std::enable_shared_from_this<scan_state_type>
75 scan_state_type(scan_initial_type i, Subscriber scrbr)
76 : scan_initial_type(i)
77 , result(scan_initial_type::seed)
78 , out(std::move(scrbr))
84 auto state = std::make_shared<scan_state_type>(initial, std::move(o));
85 state->source.subscribe(
89 state->result = state->accumulator(state->result, t);
90 state->out.on_next(state->result);
93 [state](std::exception_ptr e) {
94 state->out.on_error(e);
98 state->out.on_completed();
108 template<
class...
AN>
119 template<
class Observable,
class Seed,
class Accumulator,
124 class Scan = rxo::detail::scan<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<Seed>>,
127 static Result
member(Observable&& o, Seed s, Accumulator&& a) {
128 return Result(Scan(std::forward<Observable>(o), std::forward<Accumulator>(a), s));
131 template<
class...
AN>
132 static operators::detail::scan_invalid_t<
AN...>
member(
AN...) {
135 static_assert(
sizeof...(
AN) == 10000,
"scan takes (Seed, Accumulator); Accumulator must be a function with the signature Seed(Seed, T)");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o, Seed s, Accumulator &&a)
Definition: rx-scan.hpp:127
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static operators::detail::scan_invalid_t< AN... > member(AN...)
Definition: rx-scan.hpp:132
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:353
auto scan(AN &&...an) -> operator_factory< scan_tag, AN... >
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-scan.hpp:109
Definition: rx-predef.hpp:310
Definition: rx-predef.hpp:177