20 #if !defined(RXCPP_OPERATORS_RX_SKIP_HPP) 21 #define RXCPP_OPERATORS_RX_SKIP_HPP 23 #include "../rx-includes.hpp" 32 struct skip_invalid_arguments {};
35 struct skip_invalid :
public rxo::operator_base<skip_invalid_arguments<AN...>> {
36 using type = observable<skip_invalid_arguments<
AN...>, skip_invalid<
AN...>>;
40 using skip_invalid_t =
typename skip_invalid<
AN...>::type;
42 template<
class T,
class Observable,
class Count>
43 struct skip :
public operator_base<T>
45 typedef rxu::decay_t<Observable> source_type;
46 typedef rxu::decay_t<Count> count_type;
49 values(source_type s, count_type t)
50 : source(std::move(s))
59 skip(source_type s, count_type t)
60 : initial(std::move(s), std::move(t))
74 template<
class Subscriber>
75 void on_subscribe(
const Subscriber& s)
const {
77 typedef Subscriber output_type;
79 :
public std::enable_shared_from_this<state_type>
82 state_type(
const values& i,
const output_type& oarg)
84 , mode_value(i.count > 0 ? mode::skipping : mode::triggered)
88 typename mode::type mode_value;
92 auto state = std::make_shared<state_type>(initial, s);
94 composite_subscription source_lifetime;
96 s.add(source_lifetime);
98 state->source.subscribe(
103 if (state->mode_value == mode::skipping) {
104 if (--state->count == 0) {
105 state->mode_value = mode::triggered;
108 state->out.on_next(t);
112 [state](std::exception_ptr e) {
113 state->mode_value = mode::errored;
114 state->out.on_error(e);
118 state->mode_value = mode::stopped;
119 state->out.on_completed();
129 template<
class...
AN>
140 template<
class Observable,
145 class Skip = rxo::detail::skip<SourceValue, rxu::decay_t<Observable>,
rxu::decay_t<Count>>,
148 static Result
member(Observable&& o, Count&& c) {
149 return Result(Skip(std::forward<Observable>(o), std::forward<Count>(c)));
152 template<
class...
AN>
153 static operators::detail::skip_invalid_t<
AN...>
member(
AN...) {
156 static_assert(
sizeof...(
AN) == 10000,
"skip takes (optional Count)");
static Result member(Observable &&o, Count &&c)
Definition: rx-skip.hpp:148
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
Definition: rx-operators.hpp:367
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static operators::detail::skip_invalid_t< AN... > member(AN...)
Definition: rx-skip.hpp:153
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
Definition: rx-predef.hpp:177