20 #if !defined(RXCPP_OPERATORS_RX_SKIP_LAST_HPP) 21 #define RXCPP_OPERATORS_RX_SKIP_LAST_HPP 23 #include "../rx-includes.hpp" 32 struct skip_last_invalid_arguments {};
35 struct skip_last_invalid :
public rxo::operator_base<skip_last_invalid_arguments<AN...>> {
36 using type = observable<skip_last_invalid_arguments<
AN...>, skip_last_invalid<
AN...>>;
39 using skip_last_invalid_t =
typename skip_last_invalid<
AN...>::type;
41 template<
class T,
class Observable,
class Count>
42 struct skip_last :
public operator_base<T>
44 typedef rxu::decay_t<Observable> source_type;
45 typedef rxu::decay_t<Count> count_type;
47 typedef std::queue<T> queue_type;
48 typedef typename queue_type::size_type queue_size_type;
52 values(source_type s, count_type t)
53 : source(std::move(s))
54 ,
count(static_cast<queue_size_type>(t))
58 queue_size_type
count;
63 : initial(std::move(s), std::move(t))
67 template<
class Subscriber>
68 void on_subscribe(
const Subscriber& s)
const {
70 typedef Subscriber output_type;
72 :
public std::enable_shared_from_this<state_type>
75 state_type(
const values& i,
const output_type& oarg)
84 auto state = std::make_shared<state_type>(initial, s);
86 composite_subscription source_lifetime;
88 s.add(source_lifetime);
90 state->source.subscribe(
95 if(state->count > 0) {
96 if (state->items.size() == state->count) {
97 state->out.on_next(std::move(state->items.front()));
100 state->items.push(t);
102 state->out.on_next(t);
106 [state](std::exception_ptr e) {
107 state->out.on_error(e);
111 state->out.on_completed();
121 template<
class...
AN>
124 return operator_factory<skip_last_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
132 template<
class Observable,
class Count,
136 class SkipLast = rxo::detail::skip_last<SourceValue, rxu::decay_t<Observable>,
rxu::decay_t<Count>>,
139 static Result
member(Observable&& o, Count&& c) {
140 return Result(SkipLast(std::forward<Observable>(o), std::forward<Count>(c)));
143 template<
class...
AN>
144 static operators::detail::skip_last_invalid_t<
AN...>
member(
AN...) {
147 static_assert(
sizeof...(
AN) == 10000,
"skip_last takes (Count)");
Definition: rx-operators.hpp:374
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, Count &&c)
Definition: rx-skip_last.hpp:139
static operators::detail::skip_last_invalid_t< AN... > member(AN...)
Definition: rx-skip_last.hpp:144
auto skip_last(AN &&...an) -> operator_factory< skip_last_tag, AN... >
Make new observable with skipped last count items from this observable.
Definition: rx-skip_last.hpp:122
Definition: rx-predef.hpp:177