RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observe_on.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
25 
26 #include "../rx-includes.hpp"
27 
28 namespace rxcpp {
29 
30 namespace operators {
31 
32 namespace detail {
33 
34 template<class... AN>
35 struct observe_on_invalid_arguments {};
36 
37 template<class... AN>
38 struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
39  using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
43 
44 template<class T, class Coordination>
45 struct observe_on
46 {
47  typedef rxu::decay_t<T> source_value_type;
48 
49  typedef rxu::decay_t<Coordination> coordination_type;
50  typedef typename coordination_type::coordinator_type coordinator_type;
51 
52  coordination_type coordination;
53 
54  observe_on(coordination_type cn)
55  : coordination(std::move(cn))
56  {
57  }
58 
59  template<class Subscriber>
60  struct observe_on_observer
61  {
62  typedef observe_on_observer<Subscriber> this_type;
63  typedef source_value_type value_type;
64  typedef rxu::decay_t<Subscriber> dest_type;
65  typedef observer<value_type, this_type> observer_type;
66 
67  typedef rxn::notification<T> notification_type;
68  typedef typename notification_type::type base_notification_type;
69  typedef std::deque<base_notification_type> queue_type;
70 
71  struct mode
72  {
73  enum type {
74  Invalid = 0,
75  Processing,
76  Empty,
77  Disposed,
78  Errored
79  };
80  };
81  struct observe_on_state : std::enable_shared_from_this<observe_on_state>
82  {
83  mutable std::mutex lock;
84  mutable queue_type fill_queue;
85  mutable queue_type drain_queue;
86  composite_subscription lifetime;
87  mutable typename mode::type current;
88  coordinator_type coordinator;
89  dest_type destination;
90 
91  observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
92  : lifetime(std::move(cs))
93  , current(mode::Empty)
94  , coordinator(std::move(coor))
95  , destination(std::move(d))
96  {
97  }
98 
99  void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
100  if (!guard.owns_lock()) {
101  std::terminate();
102  }
103  if (current == mode::Errored || current == mode::Disposed) {return;}
104  current = end;
105  queue_type fill_expired;
106  swap(fill_expired, fill_queue);
107  queue_type drain_expired;
108  swap(drain_expired, drain_queue);
109  RXCPP_UNWIND_AUTO([&](){guard.lock();});
110  guard.unlock();
111  lifetime.unsubscribe();
112  destination.unsubscribe();
113  }
114 
115  void ensure_processing(std::unique_lock<std::mutex>& guard) const {
116  if (!guard.owns_lock()) {
117  std::terminate();
118  }
119  if (current == mode::Empty) {
120  current = mode::Processing;
121 
122  if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
123  finish(guard, mode::Disposed);
124  }
125 
126  auto keepAlive = this->shared_from_this();
127 
128  auto drain = [keepAlive, this](const rxsc::schedulable& self){
129  using std::swap;
130  try {
131  for (;;) {
132  if (drain_queue.empty() || !destination.is_subscribed()) {
133  std::unique_lock<std::mutex> guard(lock);
134  if (!destination.is_subscribed() ||
135  (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
136  finish(guard, mode::Disposed);
137  return;
138  }
139  if (drain_queue.empty()) {
140  if (fill_queue.empty()) {
141  current = mode::Empty;
142  return;
143  }
144  swap(fill_queue, drain_queue);
145  }
146  }
147  auto notification = std::move(drain_queue.front());
148  drain_queue.pop_front();
149  notification->accept(destination);
150  std::unique_lock<std::mutex> guard(lock);
151  self();
152  if (lifetime.is_subscribed()) break;
153  }
154  } catch(...) {
155  destination.on_error(std::current_exception());
156  std::unique_lock<std::mutex> guard(lock);
157  finish(guard, mode::Errored);
158  }
159  };
160 
161  auto selectedDrain = on_exception(
162  [&](){return coordinator.act(drain);},
163  destination);
164  if (selectedDrain.empty()) {
165  finish(guard, mode::Errored);
166  return;
167  }
168 
169  auto processor = coordinator.get_worker();
170 
171  RXCPP_UNWIND_AUTO([&](){guard.lock();});
172  guard.unlock();
173 
174  processor.schedule(selectedDrain.get());
175  }
176  }
177  };
178  std::shared_ptr<observe_on_state> state;
179 
180  observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
181  : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
182  {
183  }
184 
185  void on_next(source_value_type v) const {
186  std::unique_lock<std::mutex> guard(state->lock);
187  if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
188  state->fill_queue.push_back(notification_type::on_next(std::move(v)));
189  state->ensure_processing(guard);
190  }
191  void on_error(std::exception_ptr e) const {
192  std::unique_lock<std::mutex> guard(state->lock);
193  if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
194  state->fill_queue.push_back(notification_type::on_error(e));
195  state->ensure_processing(guard);
196  }
197  void on_completed() const {
198  std::unique_lock<std::mutex> guard(state->lock);
199  if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
200  state->fill_queue.push_back(notification_type::on_completed());
201  state->ensure_processing(guard);
202  }
203 
204  static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
205  auto coor = cn.create_coordinator(d.get_subscription());
206  d.add(cs);
207 
208  this_type o(d, std::move(coor), cs);
209  auto keepAlive = o.state;
210  cs.add([=](){
211  std::unique_lock<std::mutex> guard(keepAlive->lock);
212  keepAlive->ensure_processing(guard);
213  });
214 
215  return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
216  }
217  };
218 
219  template<class Subscriber>
220  auto operator()(Subscriber dest) const
221  -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
222  return observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
223  }
224 };
225 
226 }
227 
230 template<class... AN>
231 auto observe_on(AN&&... an)
233  return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
234 }
235 
236 }
237 
238 template<>
240 {
241  template<class Observable, class Coordination,
242  class Enabled = rxu::enable_if_all_true_type_t<
245  class SourceValue = rxu::value_type_t<Observable>,
246  class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
247  static auto member(Observable&& o, Coordination&& cn)
248  -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
249  return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
250  }
251 
252  template<class... AN>
253  static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
254  std::terminate();
255  return {};
256  static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
257  }
258 };
259 
261 {
262  rxsc::scheduler factory;
263 
264  class input_type
265  {
266  rxsc::worker controller;
267  rxsc::scheduler factory;
268  identity_one_worker coordination;
269  public:
270  explicit input_type(rxsc::worker w)
271  : controller(w)
272  , factory(rxsc::make_same_worker(w))
273  , coordination(factory)
274  {
275  }
276  inline rxsc::worker get_worker() const {
277  return controller;
278  }
279  inline rxsc::scheduler get_scheduler() const {
280  return factory;
281  }
282  inline rxsc::scheduler::clock_type::time_point now() const {
283  return factory.now();
284  }
285  template<class Observable>
286  auto in(Observable o) const
287  -> decltype(o.observe_on(coordination)) {
288  return o.observe_on(coordination);
289  }
290  template<class Subscriber>
291  auto out(Subscriber s) const
292  -> Subscriber {
293  return s;
294  }
295  template<class F>
296  auto act(F f) const
297  -> F {
298  return f;
299  }
300  };
301 
302 public:
303 
304  explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {}
305 
307 
308  inline rxsc::scheduler::clock_type::time_point now() const {
309  return factory.now();
310  }
311 
312  inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
313  auto w = factory.create_worker(std::move(cs));
314  return coordinator_type(input_type(std::move(w)));
315  }
316 };
317 
320  return r;
321 }
322 
325  return r;
326 }
327 
330  return r;
331 }
332 
333 }
334 
335 #endif
Definition: rx-all.hpp:26
Definition: rx-observe_on.hpp:260
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
observe_on_one_worker(rxsc::scheduler sc)
Definition: rx-observe_on.hpp:304
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
observe_on_one_worker observe_on_event_loop()
Definition: rx-observe_on.hpp:323
observe_on_one_worker observe_on_new_thread()
Definition: rx-observe_on.hpp:328
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-observe_on.hpp:312
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
scheduler make_run_loop(const run_loop &r)
Definition: rx-runloop.hpp:204
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
static auto member(Observable &&o, Coordination &&cn) -> decltype(o.template lift< SourceValue >(ObserveOn(std::forward< Coordination >(cn))))
Definition: rx-observe_on.hpp:247
Definition: rx-coordination.hpp:23
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
Definition: rx-runloop.hpp:118
auto observe_on(AN &&...an) -> operator_factory< observe_on_tag, AN... >
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observe_on.hpp:231
Definition: rx-operators.hpp:269
observe_on_one_worker observe_on_run_loop(const rxsc::run_loop &rl)
Definition: rx-observe_on.hpp:318
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
coordinator< input_type > coordinator_type
Definition: rx-observe_on.hpp:306
Definition: rx-coordination.hpp:114
static operators::detail::observe_on_invalid_t< AN... > member(AN...)
Definition: rx-observe_on.hpp:253
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-observe_on.hpp:308
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45