RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-skip_until.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
27 #if !defined(RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP)
28 #define RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP
29 
30 #include "../rx-includes.hpp"
31 
32 namespace rxcpp {
33 
34 namespace operators {
35 
36 namespace detail {
37 
38 template<class... AN>
39 struct skip_until_invalid_arguments {};
40 
41 template<class... AN>
42 struct skip_until_invalid : public rxo::operator_base<skip_until_invalid_arguments<AN...>> {
43  using type = observable<skip_until_invalid_arguments<AN...>, skip_until_invalid<AN...>>;
44 };
45 template<class... AN>
46 using skip_until_invalid_t = typename skip_until_invalid<AN...>::type;
47 
48 template<class T, class Observable, class TriggerObservable, class Coordination>
49 struct skip_until : public operator_base<T>
50 {
51  typedef rxu::decay_t<Observable> source_type;
52  typedef rxu::decay_t<TriggerObservable> trigger_source_type;
53  typedef rxu::decay_t<Coordination> coordination_type;
54  typedef typename coordination_type::coordinator_type coordinator_type;
55  struct values
56  {
57  values(source_type s, trigger_source_type t, coordination_type sf)
58  : source(std::move(s))
59  , trigger(std::move(t))
60  , coordination(std::move(sf))
61  {
62  }
63  source_type source;
64  trigger_source_type trigger;
65  coordination_type coordination;
66  };
67  values initial;
68 
69  skip_until(source_type s, trigger_source_type t, coordination_type sf)
70  : initial(std::move(s), std::move(t), std::move(sf))
71  {
72  }
73 
74  struct mode
75  {
76  enum type {
77  skipping, // no messages from trigger
78  clear, // trigger completed
79  triggered, // trigger sent on_next
80  errored, // error either on trigger or on observable
81  stopped // observable completed
82  };
83  };
84 
85  template<class Subscriber>
86  void on_subscribe(Subscriber s) const {
87 
88  typedef Subscriber output_type;
89  struct state_type
90  : public std::enable_shared_from_this<state_type>
91  , public values
92  {
93  state_type(const values& i, coordinator_type coor, const output_type& oarg)
94  : values(i)
95  , mode_value(mode::skipping)
96  , coordinator(std::move(coor))
97  , out(oarg)
98  {
99  out.add(trigger_lifetime);
100  out.add(source_lifetime);
101  }
102  typename mode::type mode_value;
103  composite_subscription trigger_lifetime;
104  composite_subscription source_lifetime;
105  coordinator_type coordinator;
106  output_type out;
107  };
108 
109  auto coordinator = initial.coordination.create_coordinator();
110 
111  // take a copy of the values for each subscription
112  auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
113 
114  auto trigger = on_exception(
115  [&](){return state->coordinator.in(state->trigger);},
116  state->out);
117  if (trigger.empty()) {
118  return;
119  }
120 
121  auto source = on_exception(
122  [&](){return state->coordinator.in(state->source);},
123  state->out);
124  if (source.empty()) {
125  return;
126  }
127 
128  auto sinkTrigger = make_subscriber<typename trigger_source_type::value_type>(
129  // share parts of subscription
130  state->out,
131  // new lifetime
132  state->trigger_lifetime,
133  // on_next
134  [state](const typename trigger_source_type::value_type&) {
135  if (state->mode_value != mode::skipping) {
136  return;
137  }
138  state->mode_value = mode::triggered;
139  state->trigger_lifetime.unsubscribe();
140  },
141  // on_error
142  [state](std::exception_ptr e) {
143  if (state->mode_value != mode::skipping) {
144  return;
145  }
146  state->mode_value = mode::errored;
147  state->out.on_error(e);
148  },
149  // on_completed
150  [state]() {
151  if (state->mode_value != mode::skipping) {
152  return;
153  }
154  state->mode_value = mode::clear;
155  state->trigger_lifetime.unsubscribe();
156  }
157  );
158  auto selectedSinkTrigger = on_exception(
159  [&](){return state->coordinator.out(sinkTrigger);},
160  state->out);
161  if (selectedSinkTrigger.empty()) {
162  return;
163  }
164  trigger->subscribe(std::move(selectedSinkTrigger.get()));
165 
166  source.get().subscribe(
167  // split subscription lifetime
168  state->source_lifetime,
169  // on_next
170  [state](T t) {
171  if (state->mode_value != mode::triggered) {
172  return;
173  }
174  state->out.on_next(t);
175  },
176  // on_error
177  [state](std::exception_ptr e) {
178  if (state->mode_value > mode::triggered) {
179  return;
180  }
181  state->mode_value = mode::errored;
182  state->out.on_error(e);
183  },
184  // on_completed
185  [state]() {
186  if (state->mode_value != mode::triggered) {
187  return;
188  }
189  state->mode_value = mode::stopped;
190  state->out.on_completed();
191  }
192  );
193  }
194 };
195 
196 }
197 
200 template<class... AN>
201 auto skip_until(AN&&... an)
203  return operator_factory<skip_until_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
204 }
205 
206 }
207 
208 template<>
210 {
211  template<class Observable, class TimePoint,
212  class Enabled = rxu::enable_if_all_true_type_t<
214  std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
215  class SourceValue = rxu::value_type_t<Observable>,
217  class TimerValue = rxu::value_type_t<Timer>,
218  class TriggerObservable = observable<TimerValue, Timer>,
219  class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, identity_one_worker>,
220  class Value = rxu::value_type_t<SkipUntil>,
221  class Result = observable<Value, SkipUntil>>
222  static Result member(Observable&& o, TimePoint&& when) {
223  auto cn = identity_current_thread();
224  return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
225  }
226 
227  template<class Observable, class TimePoint, class Coordination,
228  class Enabled = rxu::enable_if_all_true_type_t<
229  is_observable<Observable>,
231  std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
232  class SourceValue = rxu::value_type_t<Observable>,
234  class TimerValue = rxu::value_type_t<Timer>,
235  class TriggerObservable = observable<TimerValue, Timer>,
236  class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, rxu::decay_t<Coordination>>,
237  class Value = rxu::value_type_t<SkipUntil>,
238  class Result = observable<Value, SkipUntil>>
239  static Result member(Observable&& o, TimePoint&& when, Coordination cn) {
240  return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
241  }
242 
243  template<class Observable, class TriggerObservable,
244  class Enabled = rxu::enable_if_all_true_type_t<
246  class SourceValue = rxu::value_type_t<Observable>,
247  class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, identity_one_worker>,
248  class Value = rxu::value_type_t<SkipUntil>,
249  class Result = observable<Value, SkipUntil>>
250  static Result member(Observable&& o, TriggerObservable&& t) {
251  return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), identity_current_thread()));
252  }
253 
254  template<class Observable, class TriggerObservable, class Coordination,
255  class Enabled = rxu::enable_if_all_true_type_t<
256  all_observables<Observable, TriggerObservable>,
257  is_coordination<Coordination>>,
258  class SourceValue = rxu::value_type_t<Observable>,
259  class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>,
260  class Value = rxu::value_type_t<SkipUntil>,
261  class Result = observable<Value, SkipUntil>>
262  static Result member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
263  return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
264  }
265 
266  template<class... AN>
267  static operators::detail::skip_until_invalid_t<AN...> member(AN...) {
268  std::terminate();
269  return {};
270  static_assert(sizeof...(AN) == 10000, "skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");
271  }
272 };
273 
274 }
275 
276 #endif
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto skip_until(AN &&...an) -> operator_factory< skip_until_tag, AN... >
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-skip_until.hpp:201
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:381
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, TimePoint &&when)
Definition: rx-skip_until.hpp:222
Definition: rx-util.hpp:325
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
static Result member(Observable &&o, TimePoint &&when, Coordination cn)
Definition: rx-skip_until.hpp:239
static Result member(Observable &&o, TriggerObservable &&t, Coordination &&cn)
Definition: rx-skip_until.hpp:262
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static Result member(Observable &&o, TriggerObservable &&t)
Definition: rx-skip_until.hpp:250
static operators::detail::skip_until_invalid_t< AN... > member(AN...)
Definition: rx-skip_until.hpp:267
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37