RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-delay.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
22 #if !defined(RXCPP_OPERATORS_RX_DELAY_HPP)
23 #define RXCPP_OPERATORS_RX_DELAY_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct delay_invalid_arguments {};
35 
36 template<class... AN>
37 struct delay_invalid : public rxo::operator_base<delay_invalid_arguments<AN...>> {
38  using type = observable<delay_invalid_arguments<AN...>, delay_invalid<AN...>>;
39 };
40 template<class... AN>
41 using delay_invalid_t = typename delay_invalid<AN...>::type;
42 
43 template<class T, class Duration, class Coordination>
44 struct delay
45 {
46  typedef rxu::decay_t<T> source_value_type;
47  typedef rxu::decay_t<Coordination> coordination_type;
48  typedef typename coordination_type::coordinator_type coordinator_type;
49  typedef rxu::decay_t<Duration> duration_type;
50 
51  struct delay_values
52  {
53  delay_values(duration_type p, coordination_type c)
54  : period(p)
55  , coordination(c)
56  {
57  }
58  duration_type period;
59  coordination_type coordination;
60  };
61  delay_values initial;
62 
63  delay(duration_type period, coordination_type coordination)
64  : initial(period, coordination)
65  {
66  }
67 
68  template<class Subscriber>
69  struct delay_observer
70  {
71  typedef delay_observer<Subscriber> this_type;
72  typedef rxu::decay_t<T> value_type;
73  typedef rxu::decay_t<Subscriber> dest_type;
74  typedef observer<T, this_type> observer_type;
75 
76  struct delay_subscriber_values : public delay_values
77  {
78  delay_subscriber_values(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
79  : delay_values(v)
80  , cs(std::move(cs))
81  , dest(std::move(d))
82  , coordinator(std::move(c))
83  , worker(coordinator.get_worker())
84  , expected(worker.now())
85  {
86  }
87  composite_subscription cs;
88  dest_type dest;
89  coordinator_type coordinator;
90  rxsc::worker worker;
91  rxsc::scheduler::clock_type::time_point expected;
92  };
93  std::shared_ptr<delay_subscriber_values> state;
94 
95  delay_observer(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
96  : state(std::make_shared<delay_subscriber_values>(delay_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
97  {
98  auto localState = state;
99 
100  auto disposer = [=](const rxsc::schedulable&){
101  localState->cs.unsubscribe();
102  localState->dest.unsubscribe();
103  localState->worker.unsubscribe();
104  };
105  auto selectedDisposer = on_exception(
106  [&](){return localState->coordinator.act(disposer);},
107  localState->dest);
108  if (selectedDisposer.empty()) {
109  return;
110  }
111 
112  localState->dest.add([=](){
113  localState->worker.schedule(selectedDisposer.get());
114  });
115  localState->cs.add([=](){
116  localState->worker.schedule(localState->worker.now() + localState->period, selectedDisposer.get());
117  });
118  }
119 
120  void on_next(T v) const {
121  auto localState = state;
122  auto work = [v, localState](const rxsc::schedulable&){
123  localState->dest.on_next(v);
124  };
125  auto selectedWork = on_exception(
126  [&](){return localState->coordinator.act(work);},
127  localState->dest);
128  if (selectedWork.empty()) {
129  return;
130  }
131  localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
132  }
133 
134  void on_error(std::exception_ptr e) const {
135  auto localState = state;
136  auto work = [e, localState](const rxsc::schedulable&){
137  localState->dest.on_error(e);
138  };
139  auto selectedWork = on_exception(
140  [&](){return localState->coordinator.act(work);},
141  localState->dest);
142  if (selectedWork.empty()) {
143  return;
144  }
145  localState->worker.schedule(selectedWork.get());
146  }
147 
148  void on_completed() const {
149  auto localState = state;
150  auto work = [localState](const rxsc::schedulable&){
151  localState->dest.on_completed();
152  };
153  auto selectedWork = on_exception(
154  [&](){return localState->coordinator.act(work);},
155  localState->dest);
156  if (selectedWork.empty()) {
157  return;
158  }
159  localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
160  }
161 
162  static subscriber<T, observer_type> make(dest_type d, delay_values v) {
163  auto cs = composite_subscription();
164  auto coordinator = v.coordination.create_coordinator();
165 
166  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
167  }
168  };
169 
170  template<class Subscriber>
171  auto operator()(Subscriber dest) const
172  -> decltype(delay_observer<Subscriber>::make(std::move(dest), initial)) {
173  return delay_observer<Subscriber>::make(std::move(dest), initial);
174  }
175 };
176 
177 }
178 
181 template<class... AN>
182 auto delay(AN&&... an)
183  -> operator_factory<delay_tag, AN...> {
184  return operator_factory<delay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
185 }
186 
187 }
188 
189 template<>
191 {
192  template<class Observable, class Duration,
193  class Enabled = rxu::enable_if_all_true_type_t<
196  class SourceValue = rxu::value_type_t<Observable>,
197  class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
198  static auto member(Observable&& o, Duration&& d)
199  -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), identity_current_thread()))) {
200  return o.template lift<SourceValue>(delay(std::forward<Duration>(d), identity_current_thread()));
201  }
202 
203  template<class Observable, class Coordination, class Duration,
204  class Enabled = rxu::enable_if_all_true_type_t<
205  is_observable<Observable>,
207  rxu::is_duration<Duration>>,
208  class SourceValue = rxu::value_type_t<Observable>,
209  class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
210  static auto member(Observable&& o, Coordination&& cn, Duration&& d)
211  -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
212  return o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
213  }
214 
215  template<class Observable, class Coordination, class Duration,
216  class Enabled = rxu::enable_if_all_true_type_t<
217  is_observable<Observable>,
218  is_coordination<Coordination>,
219  rxu::is_duration<Duration>>,
220  class SourceValue = rxu::value_type_t<Observable>,
221  class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
222  static auto member(Observable&& o, Duration&& d, Coordination&& cn)
223  -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
224  return o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
225  }
226 
227  template<class... AN>
228  static operators::detail::delay_invalid_t<AN...> member(const AN&...) {
229  std::terminate();
230  return {};
231  static_assert(sizeof...(AN) == 10000, "delay takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
232  }
233 };
234 
235 }
236 
237 #endif
Definition: rx-util.hpp:791
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(delay(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-delay.hpp:210
Definition: rx-operators.hpp:69
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(delay(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-delay.hpp:198
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(delay(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-delay.hpp:222
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto delay(AN &&...an) -> operator_factory< delay_tag, AN... >
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-delay.hpp:182
Definition: rx-operators.hpp:185
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static operators::detail::delay_invalid_t< AN... > member(const AN &...)
Definition: rx-delay.hpp:228
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37