20 #if !defined(RXCPP_OPERATORS_RX_TAKE_HPP) 21 #define RXCPP_OPERATORS_RX_TAKE_HPP 23 #include "../rx-includes.hpp" 32 struct take_invalid_arguments {};
35 struct take_invalid :
public rxo::operator_base<take_invalid_arguments<AN...>> {
36 using type = observable<take_invalid_arguments<
AN...>, take_invalid<
AN...>>;
39 using take_invalid_t =
typename take_invalid<
AN...>::type;
41 template<
class T,
class Observable,
class Count>
42 struct take :
public operator_base<T>
44 typedef rxu::decay_t<Observable> source_type;
45 typedef rxu::decay_t<Count> count_type;
48 values(source_type s, count_type t)
49 : source(std::move(s))
58 take(source_type s, count_type t)
59 : initial(std::move(s), std::move(t))
73 template<
class Subscriber>
74 void on_subscribe(
const Subscriber& s)
const {
76 typedef Subscriber output_type;
78 :
public std::enable_shared_from_this<state_type>
81 state_type(
const values& i,
const output_type& oarg)
83 , mode_value(mode::taking)
87 typename mode::type mode_value;
91 auto state = std::make_shared<state_type>(initial, s);
93 composite_subscription source_lifetime;
95 s.add(source_lifetime);
97 state->source.subscribe(
101 [state, source_lifetime](T t) {
102 if (state->mode_value < mode::triggered) {
103 if (--state->count > 0) {
104 state->out.on_next(t);
106 state->mode_value = mode::triggered;
107 state->out.on_next(t);
109 source_lifetime.unsubscribe();
110 state->out.on_completed();
115 [state](std::exception_ptr e) {
116 state->mode_value = mode::errored;
117 state->out.on_error(e);
121 state->mode_value = mode::stopped;
122 state->out.on_completed();
132 template<
class...
AN>
143 template<
class Observable,
148 class Take = rxo::detail::take<SourceValue, rxu::decay_t<Observable>,
rxu::decay_t<Count>>,
151 static Result
member(Observable&& o, Count&& c) {
152 return Result(Take(std::forward<Observable>(o), std::forward<Count>(c)));
155 template<
class...
AN>
156 static operators::detail::take_invalid_t<
AN...>
member(
AN...) {
159 static_assert(
sizeof...(
AN) == 10000,
"take takes (optional Count)");
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
static Result member(Observable &&o, Count &&c)
Definition: rx-take.hpp:151
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:417
static operators::detail::take_invalid_t< AN... > member(AN...)
Definition: rx-take.hpp:156
auto take(AN &&...an) -> operator_factory< take_tag, AN... >
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-take.hpp:133
Definition: rx-predef.hpp:177