20 #if !defined(RXCPP_OPERATORS_RX_TAKE_LAST_HPP) 21 #define RXCPP_OPERATORS_RX_TAKE_LAST_HPP 23 #include "../rx-includes.hpp" 32 struct take_last_invalid_arguments {};
35 struct take_last_invalid :
public rxo::operator_base<take_last_invalid_arguments<AN...>> {
36 using type = observable<take_last_invalid_arguments<
AN...>, take_last_invalid<
AN...>>;
39 using take_last_invalid_t =
typename take_last_invalid<
AN...>::type;
41 template<
class T,
class Observable,
class Count>
42 struct take_last :
public operator_base<T>
44 typedef rxu::decay_t<Observable> source_type;
45 typedef rxu::decay_t<Count> count_type;
47 typedef std::queue<T> queue_type;
48 typedef typename queue_type::size_type queue_size_type;
52 values(source_type s, count_type t)
53 : source(std::move(s))
54 ,
count(static_cast<queue_size_type>(t))
58 queue_size_type
count;
63 : initial(std::move(s), std::move(t))
67 template<
class Subscriber>
68 void on_subscribe(
const Subscriber& s)
const {
70 typedef Subscriber output_type;
72 :
public std::enable_shared_from_this<state_type>
75 state_type(
const values& i,
const output_type& oarg)
84 auto state = std::make_shared<state_type>(initial, s);
86 composite_subscription source_lifetime;
88 s.add(source_lifetime);
90 state->source.subscribe(
94 [state, source_lifetime](T t) {
95 if(state->count > 0) {
96 if (state->items.size() == state->count) {
103 [state](std::exception_ptr e) {
104 state->out.on_error(e);
108 while(!state->items.empty()) {
109 state->out.on_next(std::move(state->items.front()));
112 state->out.on_completed();
122 template<
class...
AN>
125 return operator_factory<take_last_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
133 template<
class Observable,
138 class TakeLast = rxo::detail::take_last<SourceValue, rxu::decay_t<Observable>,
rxu::decay_t<Count>>,
141 static Result
member(Observable&& o, Count&& c) {
142 return Result(TakeLast(std::forward<Observable>(o), std::forward<Count>(c)));
145 template<
class...
AN>
146 static operators::detail::take_last_invalid_t<
AN...>
member(
AN...) {
149 static_assert(
sizeof...(
AN) == 10000,
"take_last takes (Count)");
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto take_last(AN &&...an) -> operator_factory< take_last_tag, AN... >
Emit only the final t items emitted by the source Observable.
Definition: rx-take_last.hpp:123
static Result member(Observable &&o, Count &&c)
Definition: rx-take_last.hpp:141
Definition: rx-operators.hpp:424
Definition: rx-predef.hpp:177
static operators::detail::take_last_invalid_t< AN... > member(AN...)
Definition: rx-take_last.hpp:146