RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-coordination.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_COORDINATION_HPP)
6 #define RXCPP_RX_COORDINATION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 struct tag_coordinator {};
14 
15 template<class T, class C = rxu::types_checked>
16 struct is_coordinator : public std::false_type {};
17 
18 template<class T>
19 struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
20  : public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
21 
22 struct tag_coordination {};
24 
25 namespace detail {
26 
27 template<class T, class C = rxu::types_checked>
28 struct is_coordination : public std::false_type {};
29 
30 template<class T>
31 struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
32  : public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
33 
34 }
35 
36 template<class T, class Decayed = rxu::decay_t<T>>
37 struct is_coordination : detail::is_coordination<Decayed>
38 {
39 };
40 
41 template<class Coordination, class DecayedCoordination = rxu::decay_t<Coordination>>
42 using coordination_tag_t = typename DecayedCoordination::coordination_tag;
43 
44 template<class Input>
46 {
47 public:
48  typedef Input input_type;
49 
50 private:
51  struct not_supported {typedef not_supported type;};
52 
53  template<class Observable>
54  struct get_observable
55  {
56  typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type;
57  };
58 
59  template<class Subscriber>
60  struct get_subscriber
61  {
62  typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type;
63  };
64 
65  template<class F>
66  struct get_action_function
67  {
68  typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type;
69  };
70 
71 public:
72  input_type input;
73 
74  template<class T>
75  struct get
76  {
77  typedef typename std::conditional<
78  rxsc::detail::is_action_function<T>::value, get_action_function<T>, typename std::conditional<
79  is_observable<T>::value, get_observable<T>, typename std::conditional<
80  is_subscriber<T>::value, get_subscriber<T>, not_supported>::type>::type>::type::type type;
81  };
82 
83  coordinator(Input i) : input(i) {}
84 
86  return input.get_worker();
87  }
89  return input.get_scheduler();
90  }
91 
92  template<class Observable>
93  auto in(Observable o) const
94  -> typename get_observable<Observable>::type {
95  return input.in(std::move(o));
96  static_assert(is_observable<Observable>::value, "can only synchronize observables");
97  }
98 
99  template<class Subscriber>
100  auto out(Subscriber s) const
101  -> typename get_subscriber<Subscriber>::type {
102  return input.out(std::move(s));
103  static_assert(is_subscriber<Subscriber>::value, "can only synchronize subscribers");
104  }
105 
106  template<class F>
107  auto act(F f) const
108  -> typename get_action_function<F>::type {
109  return input.act(std::move(f));
110  static_assert(rxsc::detail::is_action_function<F>::value, "can only synchronize action functions");
111  }
112 };
113 
115 {
116  rxsc::scheduler factory;
117 
118  class input_type
119  {
120  rxsc::worker controller;
121  rxsc::scheduler factory;
122  public:
123  explicit input_type(rxsc::worker w)
124  : controller(w)
125  , factory(rxsc::make_same_worker(w))
126  {
127  }
128  inline rxsc::worker get_worker() const {
129  return controller;
130  }
131  inline rxsc::scheduler get_scheduler() const {
132  return factory;
133  }
134  inline rxsc::scheduler::clock_type::time_point now() const {
135  return factory.now();
136  }
137  template<class Observable>
138  auto in(Observable o) const
139  -> Observable {
140  return o;
141  }
142  template<class Subscriber>
143  auto out(Subscriber s) const
144  -> Subscriber {
145  return s;
146  }
147  template<class F>
148  auto act(F f) const
149  -> F {
150  return f;
151  }
152  };
153 
154 public:
155 
156  explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {}
157 
159 
160  inline rxsc::scheduler::clock_type::time_point now() const {
161  return factory.now();
162  }
163 
164  inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
165  auto w = factory.create_worker(std::move(cs));
166  return coordinator_type(input_type(std::move(w)));
167  }
168 };
169 
172  return r;
173 }
174 
177  return r;
178 }
179 
182 }
183 
185 {
186  rxsc::scheduler factory;
187 
188  template<class F>
189  struct serialize_action
190  {
191  F dest;
192  std::shared_ptr<std::mutex> lock;
193  serialize_action(F d, std::shared_ptr<std::mutex> m)
194  : dest(std::move(d))
195  , lock(std::move(m))
196  {
197  if (!lock) {
198  std::terminate();
199  }
200  }
201  auto operator()(const rxsc::schedulable& scbl) const
202  -> decltype(dest(scbl)) {
203  std::unique_lock<std::mutex> guard(*lock);
204  return dest(scbl);
205  }
206  };
207 
208  template<class Observer>
209  struct serialize_observer
210  {
211  typedef serialize_observer<Observer> this_type;
212  typedef rxu::decay_t<Observer> dest_type;
213  typedef typename dest_type::value_type value_type;
214  typedef observer<value_type, this_type> observer_type;
215  dest_type dest;
216  std::shared_ptr<std::mutex> lock;
217 
218  serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
219  : dest(std::move(d))
220  , lock(std::move(m))
221  {
222  if (!lock) {
223  std::terminate();
224  }
225  }
226  void on_next(value_type v) const {
227  std::unique_lock<std::mutex> guard(*lock);
228  dest.on_next(v);
229  }
230  void on_error(std::exception_ptr e) const {
231  std::unique_lock<std::mutex> guard(*lock);
232  dest.on_error(e);
233  }
234  void on_completed() const {
235  std::unique_lock<std::mutex> guard(*lock);
236  dest.on_completed();
237  }
238 
239  template<class Subscriber>
240  static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
241  return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
242  }
243  };
244 
245  class input_type
246  {
247  rxsc::worker controller;
248  rxsc::scheduler factory;
249  std::shared_ptr<std::mutex> lock;
250  public:
251  explicit input_type(rxsc::worker w, std::shared_ptr<std::mutex> m)
252  : controller(w)
253  , factory(rxsc::make_same_worker(w))
254  , lock(std::move(m))
255  {
256  }
257  inline rxsc::worker get_worker() const {
258  return controller;
259  }
260  inline rxsc::scheduler get_scheduler() const {
261  return factory;
262  }
263  inline rxsc::scheduler::clock_type::time_point now() const {
264  return factory.now();
265  }
266  template<class Observable>
267  auto in(Observable o) const
268  -> Observable {
269  return o;
270  }
271  template<class Subscriber>
272  auto out(const Subscriber& s) const
273  -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
274  return serialize_observer<decltype(s.get_observer())>::make(s, lock);
275  }
276  template<class F>
277  auto act(F f) const
278  -> serialize_action<F> {
279  return serialize_action<F>(std::move(f), lock);
280  }
281  };
282 
283 public:
284 
285  explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {}
286 
288 
289  inline rxsc::scheduler::clock_type::time_point now() const {
290  return factory.now();
291  }
292 
293  inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
294  auto w = factory.create_worker(std::move(cs));
295  std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
296  return coordinator_type(input_type(std::move(w), std::move(lock)));
297  }
298 };
299 
302  return r;
303 }
304 
307  return r;
308 }
309 
312 }
313 
314 }
315 
316 #endif
auto in(Observable o) const -> typename get_observable< Observable >::type
Definition: rx-coordination.hpp:93
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-coordination.hpp:164
Definition: rx-all.hpp:26
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-coordination.hpp:289
serialize_one_worker serialize_same_worker(rxsc::worker w)
Definition: rx-coordination.hpp:310
coordinator< input_type > coordinator_type
Definition: rx-coordination.hpp:287
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Input input_type
Definition: rx-coordination.hpp:48
Definition: rx-coordination.hpp:184
Definition: rx-coordination.hpp:12
identity_one_worker(rxsc::scheduler sc)
Definition: rx-coordination.hpp:156
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-coordination.hpp:160
typename DecayedCoordination::coordination_tag coordination_tag_t
Definition: rx-coordination.hpp:42
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
identity_one_worker identity_same_worker(rxsc::worker w)
Definition: rx-coordination.hpp:180
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
auto act(F f) const -> typename get_action_function< F >::type
Definition: rx-coordination.hpp:107
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
input_type input
Definition: rx-coordination.hpp:72
Definition: rx-coordination.hpp:16
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
rxsc::scheduler get_scheduler() const
Definition: rx-coordination.hpp:88
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
serialize_one_worker serialize_event_loop()
Definition: rx-coordination.hpp:300
Definition: rx-coordination.hpp:13
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
serialize_one_worker(rxsc::scheduler sc)
Definition: rx-coordination.hpp:285
Definition: rx-coordination.hpp:23
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
std::conditional< rxsc::detail::is_action_function< T >::value, get_action_function< T >, typename std::conditional< is_observable< T >::value, get_observable< T >, typename std::conditional< is_subscriber< T >::value, get_subscriber< T >, not_supported >::type >::type >::type::type type
Definition: rx-coordination.hpp:80
coordinator(Input i)
Definition: rx-coordination.hpp:83
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-coordination.hpp:293
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
coordinator< input_type > coordinator_type
Definition: rx-coordination.hpp:158
tag_coordinator coordinator_tag
Definition: rx-coordination.hpp:13
serialize_one_worker serialize_new_thread()
Definition: rx-coordination.hpp:305
Definition: rx-coordination.hpp:114
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-coordination.hpp:75
auto out(Subscriber s) const -> typename get_subscriber< Subscriber >::type
Definition: rx-coordination.hpp:100
Definition: rx-scheduler.hpp:426
Definition: rx-predef.hpp:115
Definition: rx-coordination.hpp:22
tag_coordination coordination_tag
Definition: rx-coordination.hpp:23
rxsc::worker get_worker() const
Definition: rx-coordination.hpp:85
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
const scheduler & make_immediate()
Definition: rx-immediate.hpp:75
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45