RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-timeout.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
22 #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP)
23 #define RXCPP_OPERATORS_RX_TIMEOUT_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 class timeout_error: public std::runtime_error
30 {
31  public:
32  explicit timeout_error(const std::string& msg):
33  std::runtime_error(msg)
34  {}
35 };
36 
37 namespace operators {
38 
39 namespace detail {
40 
41 template<class... AN>
42 struct timeout_invalid_arguments {};
43 
44 template<class... AN>
45 struct timeout_invalid : public rxo::operator_base<timeout_invalid_arguments<AN...>> {
46  using type = observable<timeout_invalid_arguments<AN...>, timeout_invalid<AN...>>;
47 };
48 template<class... AN>
49 using timeout_invalid_t = typename timeout_invalid<AN...>::type;
50 
51 template<class T, class Duration, class Coordination>
52 struct timeout
53 {
54  typedef rxu::decay_t<T> source_value_type;
55  typedef rxu::decay_t<Coordination> coordination_type;
56  typedef typename coordination_type::coordinator_type coordinator_type;
57  typedef rxu::decay_t<Duration> duration_type;
58 
59  struct timeout_values
60  {
61  timeout_values(duration_type p, coordination_type c)
62  : period(p)
63  , coordination(c)
64  {
65  }
66 
67  duration_type period;
68  coordination_type coordination;
69  };
70  timeout_values initial;
71 
72  timeout(duration_type period, coordination_type coordination)
73  : initial(period, coordination)
74  {
75  }
76 
77  template<class Subscriber>
78  struct timeout_observer
79  {
80  typedef timeout_observer<Subscriber> this_type;
81  typedef rxu::decay_t<T> value_type;
82  typedef rxu::decay_t<Subscriber> dest_type;
83  typedef observer<T, this_type> observer_type;
84 
85  struct timeout_subscriber_values : public timeout_values
86  {
87  timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
88  : timeout_values(v)
89  , cs(std::move(cs))
90  , dest(std::move(d))
91  , coordinator(std::move(c))
92  , worker(coordinator.get_worker())
93  , index(0)
94  {
95  }
96 
98  dest_type dest;
99  coordinator_type coordinator;
100  rxsc::worker worker;
101  mutable std::size_t index;
102  };
103  typedef std::shared_ptr<timeout_subscriber_values> state_type;
104  state_type state;
105 
106  timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
107  : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
108  {
109  auto localState = state;
110 
111  auto disposer = [=](const rxsc::schedulable&){
112  localState->cs.unsubscribe();
113  localState->dest.unsubscribe();
114  localState->worker.unsubscribe();
115  };
116  auto selectedDisposer = on_exception(
117  [&](){ return localState->coordinator.act(disposer); },
118  localState->dest);
119  if (selectedDisposer.empty()) {
120  return;
121  }
122 
123  localState->dest.add([=](){
124  localState->worker.schedule(selectedDisposer.get());
125  });
126  localState->cs.add([=](){
127  localState->worker.schedule(selectedDisposer.get());
128  });
129  }
130 
131  static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) {
132  auto produce = [id, state](const rxsc::schedulable&) {
133  if(id != state->index)
134  return;
135 
136  state->dest.on_error(std::make_exception_ptr(rxcpp::timeout_error("timeout has occurred")));
137  };
138 
139  auto selectedProduce = on_exception(
140  [&](){ return state->coordinator.act(produce); },
141  state->dest);
142  if (selectedProduce.empty()) {
143  return std::function<void(const rxsc::schedulable&)>();
144  }
145 
146  return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
147  }
148 
149  void on_next(T v) const {
150  auto localState = state;
151  auto work = [v, localState](const rxsc::schedulable&) {
152  auto new_id = ++localState->index;
153  auto produce_time = localState->worker.now() + localState->period;
154 
155  localState->dest.on_next(v);
156  localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
157  };
158  auto selectedWork = on_exception(
159  [&](){return localState->coordinator.act(work);},
160  localState->dest);
161  if (selectedWork.empty()) {
162  return;
163  }
164  localState->worker.schedule(selectedWork.get());
165  }
166 
167  void on_error(std::exception_ptr e) const {
168  auto localState = state;
169  auto work = [e, localState](const rxsc::schedulable&) {
170  localState->dest.on_error(e);
171  };
172  auto selectedWork = on_exception(
173  [&](){ return localState->coordinator.act(work); },
174  localState->dest);
175  if (selectedWork.empty()) {
176  return;
177  }
178  localState->worker.schedule(selectedWork.get());
179  }
180 
181  void on_completed() const {
182  auto localState = state;
183  auto work = [localState](const rxsc::schedulable&) {
184  localState->dest.on_completed();
185  };
186  auto selectedWork = on_exception(
187  [&](){ return localState->coordinator.act(work); },
188  localState->dest);
189  if (selectedWork.empty()) {
190  return;
191  }
192  localState->worker.schedule(selectedWork.get());
193  }
194 
195  static subscriber<T, observer_type> make(dest_type d, timeout_values v) {
196  auto cs = composite_subscription();
197  auto coordinator = v.coordination.create_coordinator();
198 
199  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
200  }
201  };
202 
203  template<class Subscriber>
204  auto operator()(Subscriber dest) const
205  -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
206  return timeout_observer<Subscriber>::make(std::move(dest), initial);
207  }
208 };
209 
210 }
211 
214 template<class... AN>
215 auto timeout(AN&&... an)
217  return operator_factory<timeout_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
218 }
219 
220 }
221 
222 template<>
224 {
225  template<class Observable, class Duration,
226  class Enabled = rxu::enable_if_all_true_type_t<
229  class SourceValue = rxu::value_type_t<Observable>,
230  class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
231  static auto member(Observable&& o, Duration&& d)
232  -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), identity_current_thread()))) {
233  return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), identity_current_thread()));
234  }
235 
236  template<class Observable, class Coordination, class Duration,
237  class Enabled = rxu::enable_if_all_true_type_t<
238  is_observable<Observable>,
240  rxu::is_duration<Duration>>,
241  class SourceValue = rxu::value_type_t<Observable>,
242  class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
243  static auto member(Observable&& o, Coordination&& cn, Duration&& d)
244  -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
245  return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
246  }
247 
248  template<class Observable, class Coordination, class Duration,
249  class Enabled = rxu::enable_if_all_true_type_t<
250  is_observable<Observable>,
251  is_coordination<Coordination>,
252  rxu::is_duration<Duration>>,
253  class SourceValue = rxu::value_type_t<Observable>,
254  class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
255  static auto member(Observable&& o, Duration&& d, Coordination&& cn)
256  -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
257  return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
258  }
259 
260  template<class... AN>
261  static operators::detail::timeout_invalid_t<AN...> member(const AN&...) {
262  std::terminate();
263  return {};
264  static_assert(sizeof...(AN) == 10000, "timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
265  }
266 };
267 
268 }
269 
270 #endif
Definition: rx-util.hpp:791
timeout_error(const std::string &msg)
Definition: rx-timeout.hpp:32
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:243
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-timeout.hpp:29
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:255
Definition: rx-operators.hpp:452
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
Definition: rx-operators.hpp:16
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto timeout(AN &&...an) -> operator_factory< timeout_tag, AN... >
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-timeout.hpp:215
static operators::detail::timeout_invalid_t< AN... > member(const AN &...)
Definition: rx-timeout.hpp:261
Definition: rx-coordination.hpp:114
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-scheduler.hpp:426
rxsc::worker get_worker() const
Definition: rx-coordination.hpp:85
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-timeout.hpp:231
Definition: rx-coordination.hpp:45