5 #if !defined(RXCPP_RX_COORDINATION_HPP) 6 #define RXCPP_RX_COORDINATION_HPP 15 template<
class T,
class C = rxu::types_checked>
19 struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
20 :
public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
27 template<
class T,
class C = rxu::types_checked>
31 struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
32 :
public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
36 template<
class T,
class Decayed = rxu::decay_t<T>>
41 template<
class Coordination,
class DecayedCoordination = rxu::decay_t<Coordination>>
51 struct not_supported {
typedef not_supported type;};
53 template<
class Observable>
56 typedef decltype((*(input_type*)
nullptr).in((*(Observable*)
nullptr))) type;
59 template<
class Subscriber>
62 typedef decltype((*(input_type*)
nullptr).out((*(Subscriber*)
nullptr))) type;
66 struct get_action_function
68 typedef decltype((*(input_type*)
nullptr).act((*(F*)
nullptr))) type;
77 typedef typename std::conditional<
78 rxsc::detail::is_action_function<T>::value, get_action_function<T>,
typename std::conditional<
86 return input.get_worker();
89 return input.get_scheduler();
92 template<
class Observable>
93 auto in(Observable o)
const 94 ->
typename get_observable<Observable>::type {
95 return input.in(std::move(o));
99 template<
class Subscriber>
100 auto out(Subscriber s)
const 101 ->
typename get_subscriber<Subscriber>::type {
102 return input.out(std::move(s));
108 ->
typename get_action_function<F>::type {
109 return input.act(std::move(f));
110 static_assert(rxsc::detail::is_action_function<F>::value,
"can only synchronize action functions");
134 inline rxsc::scheduler::clock_type::time_point now()
const {
135 return factory.
now();
137 template<
class Observable>
138 auto in(Observable o)
const 142 template<
class Subscriber>
143 auto out(Subscriber s)
const 160 inline rxsc::scheduler::clock_type::time_point
now()
const {
161 return factory.
now();
166 return coordinator_type(input_type(std::move(w)));
189 struct serialize_action
192 std::shared_ptr<std::mutex> lock;
193 serialize_action(F d, std::shared_ptr<std::mutex> m)
202 -> decltype(dest(scbl)) {
203 std::unique_lock<std::mutex> guard(*lock);
208 template<
class Observer>
209 struct serialize_observer
211 typedef serialize_observer<Observer> this_type;
213 typedef typename dest_type::value_type value_type;
216 std::shared_ptr<std::mutex> lock;
218 serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
226 void on_next(value_type v)
const {
227 std::unique_lock<std::mutex> guard(*lock);
230 void on_error(std::exception_ptr e)
const {
231 std::unique_lock<std::mutex> guard(*lock);
234 void on_completed()
const {
235 std::unique_lock<std::mutex> guard(*lock);
239 template<
class Subscriber>
241 return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
249 std::shared_ptr<std::mutex> lock;
251 explicit input_type(
rxsc::worker w, std::shared_ptr<std::mutex> m)
263 inline rxsc::scheduler::clock_type::time_point now()
const {
264 return factory.
now();
266 template<
class Observable>
267 auto in(Observable o)
const 271 template<
class Subscriber>
272 auto out(
const Subscriber& s)
const 273 -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
274 return serialize_observer<decltype(s.get_observer())>::make(s, lock);
278 -> serialize_action<F> {
279 return serialize_action<F>(std::move(f), lock);
289 inline rxsc::scheduler::clock_type::time_point
now()
const {
290 return factory.
now();
295 std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
296 return coordinator_type(input_type(std::move(w), std::move(lock)));
auto in(Observable o) const -> typename get_observable< Observable >::type
Definition: rx-coordination.hpp:93
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-coordination.hpp:164
Definition: rx-all.hpp:26
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-coordination.hpp:289
serialize_one_worker serialize_same_worker(rxsc::worker w)
Definition: rx-coordination.hpp:310
coordinator< input_type > coordinator_type
Definition: rx-coordination.hpp:287
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Input input_type
Definition: rx-coordination.hpp:48
Definition: rx-coordination.hpp:184
Definition: rx-coordination.hpp:12
identity_one_worker(rxsc::scheduler sc)
Definition: rx-coordination.hpp:156
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-coordination.hpp:160
typename DecayedCoordination::coordination_tag coordination_tag_t
Definition: rx-coordination.hpp:42
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
identity_one_worker identity_same_worker(rxsc::worker w)
Definition: rx-coordination.hpp:180
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
auto act(F f) const -> typename get_action_function< F >::type
Definition: rx-coordination.hpp:107
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
input_type input
Definition: rx-coordination.hpp:72
Definition: rx-coordination.hpp:16
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
rxsc::scheduler get_scheduler() const
Definition: rx-coordination.hpp:88
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
serialize_one_worker serialize_event_loop()
Definition: rx-coordination.hpp:300
Definition: rx-coordination.hpp:13
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
serialize_one_worker(rxsc::scheduler sc)
Definition: rx-coordination.hpp:285
Definition: rx-coordination.hpp:23
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
std::conditional< rxsc::detail::is_action_function< T >::value, get_action_function< T >, typename std::conditional< is_observable< T >::value, get_observable< T >, typename std::conditional< is_subscriber< T >::value, get_subscriber< T >, not_supported >::type >::type >::type::type type
Definition: rx-coordination.hpp:80
coordinator(Input i)
Definition: rx-coordination.hpp:83
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-coordination.hpp:293
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
coordinator< input_type > coordinator_type
Definition: rx-coordination.hpp:158
tag_coordinator coordinator_tag
Definition: rx-coordination.hpp:13
serialize_one_worker serialize_new_thread()
Definition: rx-coordination.hpp:305
Definition: rx-coordination.hpp:114
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-coordination.hpp:75
auto out(Subscriber s) const -> typename get_subscriber< Subscriber >::type
Definition: rx-coordination.hpp:100
Definition: rx-scheduler.hpp:426
Definition: rx-predef.hpp:115
Definition: rx-coordination.hpp:22
tag_coordination coordination_tag
Definition: rx-coordination.hpp:23
rxsc::worker get_worker() const
Definition: rx-coordination.hpp:85
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
const scheduler & make_immediate()
Definition: rx-immediate.hpp:75
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45