RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-skip.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
20 #if !defined(RXCPP_OPERATORS_RX_SKIP_HPP)
21 #define RXCPP_OPERATORS_RX_SKIP_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct skip_invalid_arguments {};
33 
34 template<class... AN>
35 struct skip_invalid : public rxo::operator_base<skip_invalid_arguments<AN...>> {
36  using type = observable<skip_invalid_arguments<AN...>, skip_invalid<AN...>>;
37 };
38 
39 template<class... AN>
40 using skip_invalid_t = typename skip_invalid<AN...>::type;
41 
42 template<class T, class Observable, class Count>
43 struct skip : public operator_base<T>
44 {
45  typedef rxu::decay_t<Observable> source_type;
46  typedef rxu::decay_t<Count> count_type;
47  struct values
48  {
49  values(source_type s, count_type t)
50  : source(std::move(s))
51  , count(std::move(t))
52  {
53  }
54  source_type source;
55  count_type count;
56  };
57  values initial;
58 
59  skip(source_type s, count_type t)
60  : initial(std::move(s), std::move(t))
61  {
62  }
63 
64  struct mode
65  {
66  enum type {
67  skipping, // ignore messages
68  triggered, // capture messages
69  errored, // error occured
70  stopped // observable completed
71  };
72  };
73 
74  template<class Subscriber>
75  void on_subscribe(const Subscriber& s) const {
76 
77  typedef Subscriber output_type;
78  struct state_type
79  : public std::enable_shared_from_this<state_type>
80  , public values
81  {
82  state_type(const values& i, const output_type& oarg)
83  : values(i)
84  , mode_value(i.count > 0 ? mode::skipping : mode::triggered)
85  , out(oarg)
86  {
87  }
88  typename mode::type mode_value;
89  output_type out;
90  };
91  // take a copy of the values for each subscription
92  auto state = std::make_shared<state_type>(initial, s);
93 
94  composite_subscription source_lifetime;
95 
96  s.add(source_lifetime);
97 
98  state->source.subscribe(
99  // split subscription lifetime
100  source_lifetime,
101  // on_next
102  [state](T t) {
103  if (state->mode_value == mode::skipping) {
104  if (--state->count == 0) {
105  state->mode_value = mode::triggered;
106  }
107  } else {
108  state->out.on_next(t);
109  }
110  },
111  // on_error
112  [state](std::exception_ptr e) {
113  state->mode_value = mode::errored;
114  state->out.on_error(e);
115  },
116  // on_completed
117  [state]() {
118  state->mode_value = mode::stopped;
119  state->out.on_completed();
120  }
121  );
122  }
123 };
124 
125 }
126 
129 template<class... AN>
130 auto skip(AN&&... an)
131 -> operator_factory<skip_tag, AN...> {
132  return operator_factory<skip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
133 }
134 
135 }
136 
137 template<>
139 {
140  template<class Observable,
141  class Count,
142  class Enabled = rxu::enable_if_all_true_type_t<
144  class SourceValue = rxu::value_type_t<Observable>,
145  class Skip = rxo::detail::skip<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
146  class Value = rxu::value_type_t<Skip>,
147  class Result = observable<Value, Skip>>
148  static Result member(Observable&& o, Count&& c) {
149  return Result(Skip(std::forward<Observable>(o), std::forward<Count>(c)));
150  }
151 
152  template<class... AN>
153  static operators::detail::skip_invalid_t<AN...> member(AN...) {
154  std::terminate();
155  return {};
156  static_assert(sizeof...(AN) == 10000, "skip takes (optional Count)");
157  }
158 };
159 
160 }
161 
162 #endif
static Result member(Observable &&o, Count &&c)
Definition: rx-skip.hpp:148
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
Definition: rx-operators.hpp:367
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static operators::detail::skip_invalid_t< AN... > member(AN...)
Definition: rx-skip.hpp:153
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
Definition: rx-predef.hpp:177