5 #if !defined(RXCPP_RX_SYNCHRONIZE_HPP) 6 #define RXCPP_RX_SYNCHRONIZE_HPP 8 #include "../rx-includes.hpp" 16 template<
class T,
class Coordination>
17 class synchronize_observer :
public detail::multicast_observer<T>
19 typedef synchronize_observer<T, Coordination> this_type;
20 typedef detail::multicast_observer<T> base_type;
22 typedef rxu::decay_t<Coordination> coordination_type;
23 typedef typename coordination_type::coordinator_type coordinator_type;
24 typedef typename coordinator_type::template get<subscriber<T>>::type output_type;
26 struct synchronize_observer_state :
public std::enable_shared_from_this<synchronize_observer_state>
28 typedef rxn::notification<T> notification_type;
30 typedef std::deque<base_notification_type> queue_type;
42 mutable std::mutex lock;
43 mutable std::condition_variable wake;
44 mutable queue_type fill_queue;
45 composite_subscription lifetime;
46 mutable typename mode::type current;
47 coordinator_type coordinator;
48 output_type destination;
50 void ensure_processing(std::unique_lock<std::mutex>& guard)
const {
51 if (!guard.owns_lock()) {
54 if (current == mode::Empty) {
55 current = mode::Processing;
56 auto keepAlive = this->shared_from_this();
58 auto drain_queue = [keepAlive,
this](
const rxsc::schedulable&
self){
60 std::unique_lock<std::mutex> guard(lock);
61 if (!destination.is_subscribed()) {
62 current = mode::Disposed;
65 lifetime.unsubscribe();
68 if (fill_queue.empty()) {
69 current = mode::Empty;
72 auto notification = std::move(fill_queue.front());
73 fill_queue.pop_front();
75 notification->accept(destination);
78 destination.on_error(std::current_exception());
79 std::unique_lock<std::mutex> guard(lock);
80 current = mode::Empty;
85 [&](){
return coordinator.act(drain_queue);},
87 if (selectedDrain.empty()) {
91 auto processor = coordinator.get_worker();
92 processor.schedule(lifetime, selectedDrain.get());
96 synchronize_observer_state(coordinator_type coor, composite_subscription cs, output_type scbr)
97 : lifetime(std::move(cs))
98 , current(mode::Empty)
99 , coordinator(std::move(coor))
100 , destination(std::move(scbr))
105 void on_next(V v)
const {
106 if (lifetime.is_subscribed()) {
107 std::unique_lock<std::mutex> guard(lock);
109 ensure_processing(guard);
113 void on_error(std::exception_ptr e)
const {
114 if (lifetime.is_subscribed()) {
115 std::unique_lock<std::mutex> guard(lock);
117 ensure_processing(guard);
121 void on_completed()
const {
122 if (lifetime.is_subscribed()) {
123 std::unique_lock<std::mutex> guard(lock);
125 ensure_processing(guard);
131 std::shared_ptr<synchronize_observer_state> state;
134 synchronize_observer(coordination_type cn, composite_subscription dl, composite_subscription il)
137 auto o = make_subscriber<T>(dl, make_observer_dynamic<T>( *
static_cast<base_type*
>(
this) ));
140 auto coordinator = cn.create_coordinator(dl);
142 state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
145 subscriber<T> get_subscriber()
const {
146 return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).
as_dynamic();
150 void on_next(V v)
const {
151 state->on_next(std::move(v));
153 void on_error(std::exception_ptr e)
const {
156 void on_completed()
const {
157 state->on_completed();
163 template<
class T,
class Coordination>
166 detail::synchronize_observer<T, Coordination> s;
175 return s.has_observers();
179 return s.get_subscriber();
185 keepAlive.add(keepAlive.get_subscriber(), std::move(o));
205 , coordination(factory)
214 inline rxsc::scheduler::clock_type::time_point now()
const {
215 return factory.
now();
217 template<
class Observable>
218 auto in(Observable o)
const 219 -> decltype(o.publish_synchronized(coordination).ref_count()) {
220 return o.publish_synchronized(coordination).ref_count();
222 template<
class Subscriber>
223 auto out(Subscriber s)
const 240 inline rxsc::scheduler::clock_type::time_point
now()
const {
241 return factory.
now();
246 return coordinator_type(input_type(std::move(w)));
synchronize_in_one_worker(rxsc::scheduler sc)
Definition: rx-synchronize.hpp:236
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
synchronize(Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-synchronize.hpp:169
static type on_completed()
Definition: rx-notification.hpp:225
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-synchronize.hpp:240
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
synchronize_in_one_worker synchronize_event_loop()
Definition: rx-synchronize.hpp:250
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
static type on_error(Exception &&e)
Definition: rx-notification.hpp:230
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
static type on_next(U value)
Definition: rx-notification.hpp:221
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
detail::notification_base< T >::type type
Definition: rx-notification.hpp:118
Definition: rx-coordination.hpp:23
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
observable< T > get_observable() const
Definition: rx-synchronize.hpp:182
subscriber< T > get_subscriber() const
Definition: rx-synchronize.hpp:178
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-synchronize.hpp:244
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
bool has_observers() const
Definition: rx-synchronize.hpp:174
synchronize_in_one_worker synchronize_new_thread()
Definition: rx-synchronize.hpp:255
Definition: rx-synchronize.hpp:164
coordinator< input_type > coordinator_type
Definition: rx-synchronize.hpp:238
Definition: rx-synchronize.hpp:192
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45