RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-synchronize.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SYNCHRONIZE_HPP)
6 #define RXCPP_RX_SYNCHRONIZE_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class T, class Coordination>
17 class synchronize_observer : public detail::multicast_observer<T>
18 {
19  typedef synchronize_observer<T, Coordination> this_type;
20  typedef detail::multicast_observer<T> base_type;
21 
22  typedef rxu::decay_t<Coordination> coordination_type;
23  typedef typename coordination_type::coordinator_type coordinator_type;
24  typedef typename coordinator_type::template get<subscriber<T>>::type output_type;
25 
26  struct synchronize_observer_state : public std::enable_shared_from_this<synchronize_observer_state>
27  {
28  typedef rxn::notification<T> notification_type;
29  typedef typename notification_type::type base_notification_type;
30  typedef std::deque<base_notification_type> queue_type;
31 
32  struct mode
33  {
34  enum type {
35  Invalid = 0,
36  Processing,
37  Empty,
38  Disposed
39  };
40  };
41 
42  mutable std::mutex lock;
43  mutable std::condition_variable wake;
44  mutable queue_type fill_queue;
45  composite_subscription lifetime;
46  mutable typename mode::type current;
47  coordinator_type coordinator;
48  output_type destination;
49 
50  void ensure_processing(std::unique_lock<std::mutex>& guard) const {
51  if (!guard.owns_lock()) {
52  std::terminate();
53  }
54  if (current == mode::Empty) {
55  current = mode::Processing;
56  auto keepAlive = this->shared_from_this();
57 
58  auto drain_queue = [keepAlive, this](const rxsc::schedulable& self){
59  try {
60  std::unique_lock<std::mutex> guard(lock);
61  if (!destination.is_subscribed()) {
62  current = mode::Disposed;
63  fill_queue.clear();
64  guard.unlock();
65  lifetime.unsubscribe();
66  return;
67  }
68  if (fill_queue.empty()) {
69  current = mode::Empty;
70  return;
71  }
72  auto notification = std::move(fill_queue.front());
73  fill_queue.pop_front();
74  guard.unlock();
75  notification->accept(destination);
76  self();
77  } catch(...) {
78  destination.on_error(std::current_exception());
79  std::unique_lock<std::mutex> guard(lock);
80  current = mode::Empty;
81  }
82  };
83 
84  auto selectedDrain = on_exception(
85  [&](){return coordinator.act(drain_queue);},
86  destination);
87  if (selectedDrain.empty()) {
88  return;
89  }
90 
91  auto processor = coordinator.get_worker();
92  processor.schedule(lifetime, selectedDrain.get());
93  }
94  }
95 
96  synchronize_observer_state(coordinator_type coor, composite_subscription cs, output_type scbr)
97  : lifetime(std::move(cs))
98  , current(mode::Empty)
99  , coordinator(std::move(coor))
100  , destination(std::move(scbr))
101  {
102  }
103 
104  template<class V>
105  void on_next(V v) const {
106  if (lifetime.is_subscribed()) {
107  std::unique_lock<std::mutex> guard(lock);
108  fill_queue.push_back(notification_type::on_next(std::move(v)));
109  ensure_processing(guard);
110  }
111  wake.notify_one();
112  }
113  void on_error(std::exception_ptr e) const {
114  if (lifetime.is_subscribed()) {
115  std::unique_lock<std::mutex> guard(lock);
116  fill_queue.push_back(notification_type::on_error(e));
117  ensure_processing(guard);
118  }
119  wake.notify_one();
120  }
121  void on_completed() const {
122  if (lifetime.is_subscribed()) {
123  std::unique_lock<std::mutex> guard(lock);
124  fill_queue.push_back(notification_type::on_completed());
125  ensure_processing(guard);
126  }
127  wake.notify_one();
128  }
129  };
130 
131  std::shared_ptr<synchronize_observer_state> state;
132 
133 public:
134  synchronize_observer(coordination_type cn, composite_subscription dl, composite_subscription il)
135  : base_type(dl)
136  {
137  auto o = make_subscriber<T>(dl, make_observer_dynamic<T>( *static_cast<base_type*>(this) ));
138 
139  // creates a worker whose lifetime is the same as the destination subscription
140  auto coordinator = cn.create_coordinator(dl);
141 
142  state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
143  }
144 
145  subscriber<T> get_subscriber() const {
146  return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic();
147  }
148 
149  template<class V>
150  void on_next(V v) const {
151  state->on_next(std::move(v));
152  }
153  void on_error(std::exception_ptr e) const {
154  state->on_error(e);
155  }
156  void on_completed() const {
157  state->on_completed();
158  }
159 };
160 
161 }
162 
163 template<class T, class Coordination>
165 {
166  detail::synchronize_observer<T, Coordination> s;
167 
168 public:
170  : s(std::move(cn), std::move(cs), composite_subscription())
171  {
172  }
173 
174  bool has_observers() const {
175  return s.has_observers();
176  }
177 
179  return s.get_subscriber();
180  }
181 
183  auto keepAlive = s;
184  return make_observable_dynamic<T>([=](subscriber<T> o){
185  keepAlive.add(keepAlive.get_subscriber(), std::move(o));
186  });
187  }
188 };
189 
190 }
191 
193 {
194  rxsc::scheduler factory;
195 
196  class input_type
197  {
198  rxsc::worker controller;
199  rxsc::scheduler factory;
200  identity_one_worker coordination;
201  public:
202  explicit input_type(rxsc::worker w)
203  : controller(w)
204  , factory(rxsc::make_same_worker(w))
205  , coordination(factory)
206  {
207  }
208  inline rxsc::worker get_worker() const {
209  return controller;
210  }
211  inline rxsc::scheduler get_scheduler() const {
212  return factory;
213  }
214  inline rxsc::scheduler::clock_type::time_point now() const {
215  return factory.now();
216  }
217  template<class Observable>
218  auto in(Observable o) const
219  -> decltype(o.publish_synchronized(coordination).ref_count()) {
220  return o.publish_synchronized(coordination).ref_count();
221  }
222  template<class Subscriber>
223  auto out(Subscriber s) const
224  -> Subscriber {
225  return s;
226  }
227  template<class F>
228  auto act(F f) const
229  -> F {
230  return f;
231  }
232  };
233 
234 public:
235 
236  explicit synchronize_in_one_worker(rxsc::scheduler sc) : factory(sc) {}
237 
239 
240  inline rxsc::scheduler::clock_type::time_point now() const {
241  return factory.now();
242  }
243 
244  inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
245  auto w = factory.create_worker(std::move(cs));
246  return coordinator_type(input_type(std::move(w)));
247  }
248 };
249 
252  return r;
253 }
254 
257  return r;
258 }
259 
260 }
261 
262 #endif
synchronize_in_one_worker(rxsc::scheduler sc)
Definition: rx-synchronize.hpp:236
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
synchronize(Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-synchronize.hpp:169
static type on_completed()
Definition: rx-notification.hpp:225
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-synchronize.hpp:240
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
synchronize_in_one_worker synchronize_event_loop()
Definition: rx-synchronize.hpp:250
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
static type on_error(Exception &&e)
Definition: rx-notification.hpp:230
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
static type on_next(U value)
Definition: rx-notification.hpp:221
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
detail::notification_base< T >::type type
Definition: rx-notification.hpp:118
Definition: rx-coordination.hpp:23
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
observable< T > get_observable() const
Definition: rx-synchronize.hpp:182
subscriber< T > get_subscriber() const
Definition: rx-synchronize.hpp:178
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-synchronize.hpp:244
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
bool has_observers() const
Definition: rx-synchronize.hpp:174
synchronize_in_one_worker synchronize_new_thread()
Definition: rx-synchronize.hpp:255
Definition: rx-synchronize.hpp:164
coordinator< input_type > coordinator_type
Definition: rx-synchronize.hpp:238
Definition: rx-synchronize.hpp:192
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45