23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP) 24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP 26 #include "../rx-includes.hpp" 35 struct observe_on_invalid_arguments {};
38 struct observe_on_invalid :
public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
39 using type = observable<observe_on_invalid_arguments<
AN...>, observe_on_invalid<
AN...>>;
42 using observe_on_invalid_t =
typename observe_on_invalid<
AN...>::type;
44 template<
class T,
class Coordination>
47 typedef rxu::decay_t<T> source_value_type;
49 typedef rxu::decay_t<Coordination> coordination_type;
50 typedef typename coordination_type::coordinator_type coordinator_type;
52 coordination_type coordination;
55 : coordination(std::move(cn))
59 template<
class Subscriber>
60 struct observe_on_observer
62 typedef observe_on_observer<Subscriber> this_type;
63 typedef source_value_type value_type;
64 typedef rxu::decay_t<Subscriber> dest_type;
65 typedef observer<value_type, this_type> observer_type;
67 typedef rxn::notification<T> notification_type;
68 typedef typename notification_type::type base_notification_type;
69 typedef std::deque<base_notification_type> queue_type;
81 struct observe_on_state : std::enable_shared_from_this<observe_on_state>
83 mutable std::mutex lock;
84 mutable queue_type fill_queue;
85 mutable queue_type drain_queue;
86 composite_subscription lifetime;
87 mutable typename mode::type current;
88 coordinator_type coordinator;
89 dest_type destination;
91 observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
92 : lifetime(std::move(cs))
93 , current(mode::Empty)
94 , coordinator(std::move(coor))
95 , destination(std::move(d))
99 void finish(std::unique_lock<std::mutex>& guard,
typename mode::type end)
const {
100 if (!guard.owns_lock()) {
103 if (current == mode::Errored || current == mode::Disposed) {
return;}
105 queue_type fill_expired;
106 swap(fill_expired, fill_queue);
107 queue_type drain_expired;
108 swap(drain_expired, drain_queue);
111 lifetime.unsubscribe();
112 destination.unsubscribe();
115 void ensure_processing(std::unique_lock<std::mutex>& guard)
const {
116 if (!guard.owns_lock()) {
119 if (current == mode::Empty) {
120 current = mode::Processing;
122 if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
123 finish(guard, mode::Disposed);
126 auto keepAlive = this->shared_from_this();
128 auto drain = [keepAlive,
this](
const rxsc::schedulable&
self){
132 if (drain_queue.empty() || !destination.is_subscribed()) {
133 std::unique_lock<std::mutex> guard(lock);
134 if (!destination.is_subscribed() ||
135 (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
136 finish(guard, mode::Disposed);
139 if (drain_queue.empty()) {
140 if (fill_queue.empty()) {
141 current = mode::Empty;
144 swap(fill_queue, drain_queue);
147 auto notification = std::move(drain_queue.front());
148 drain_queue.pop_front();
149 notification->accept(destination);
150 std::unique_lock<std::mutex> guard(lock);
152 if (lifetime.is_subscribed())
break;
155 destination.on_error(std::current_exception());
156 std::unique_lock<std::mutex> guard(lock);
157 finish(guard, mode::Errored);
162 [&](){
return coordinator.act(drain);},
164 if (selectedDrain.empty()) {
165 finish(guard, mode::Errored);
169 auto processor = coordinator.get_worker();
174 processor.schedule(selectedDrain.get());
178 std::shared_ptr<observe_on_state> state;
180 observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
181 : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
185 void on_next(source_value_type v)
const {
186 std::unique_lock<std::mutex> guard(state->lock);
187 if (state->current == mode::Errored || state->current == mode::Disposed) {
return; }
188 state->fill_queue.push_back(notification_type::on_next(std::move(v)));
189 state->ensure_processing(guard);
191 void on_error(std::exception_ptr e)
const {
192 std::unique_lock<std::mutex> guard(state->lock);
193 if (state->current == mode::Errored || state->current == mode::Disposed) {
return; }
194 state->fill_queue.push_back(notification_type::on_error(e));
195 state->ensure_processing(guard);
197 void on_completed()
const {
198 std::unique_lock<std::mutex> guard(state->lock);
199 if (state->current == mode::Errored || state->current == mode::Disposed) {
return; }
200 state->fill_queue.push_back(notification_type::on_completed());
201 state->ensure_processing(guard);
204 static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
205 auto coor = cn.create_coordinator(d.get_subscription());
208 this_type o(d, std::move(coor), cs);
209 auto keepAlive = o.state;
211 std::unique_lock<std::mutex> guard(keepAlive->lock);
212 keepAlive->ensure_processing(guard);
215 return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
219 template<
class Subscriber>
220 auto operator()(Subscriber dest)
const 221 -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
222 return observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
230 template<
class...
AN>
233 return operator_factory<observe_on_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
241 template<
class Observable,
class Coordination,
246 class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
247 static auto member(Observable&& o, Coordination&& cn)
248 -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
249 return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
252 template<
class...
AN>
253 static operators::detail::observe_on_invalid_t<
AN...>
member(
AN...) {
256 static_assert(
sizeof...(
AN) == 10000,
"observe_on takes (Coordination)");
273 , coordination(factory)
282 inline rxsc::scheduler::clock_type::time_point now()
const {
283 return factory.
now();
285 template<
class Observable>
286 auto in(Observable o)
const 287 -> decltype(o.observe_on(coordination)) {
288 return o.observe_on(coordination);
290 template<
class Subscriber>
291 auto out(Subscriber s)
const 308 inline rxsc::scheduler::clock_type::time_point
now()
const {
309 return factory.
now();
314 return coordinator_type(input_type(std::move(w)));
Definition: rx-all.hpp:26
Definition: rx-observe_on.hpp:260
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
observe_on_one_worker(rxsc::scheduler sc)
Definition: rx-observe_on.hpp:304
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
observe_on_one_worker observe_on_event_loop()
Definition: rx-observe_on.hpp:323
observe_on_one_worker observe_on_new_thread()
Definition: rx-observe_on.hpp:328
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-observe_on.hpp:312
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
scheduler make_run_loop(const run_loop &r)
Definition: rx-runloop.hpp:204
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
static auto member(Observable &&o, Coordination &&cn) -> decltype(o.template lift< SourceValue >(ObserveOn(std::forward< Coordination >(cn))))
Definition: rx-observe_on.hpp:247
Definition: rx-coordination.hpp:23
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
Definition: rx-runloop.hpp:118
auto observe_on(AN &&...an) -> operator_factory< observe_on_tag, AN... >
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observe_on.hpp:231
Definition: rx-operators.hpp:269
observe_on_one_worker observe_on_run_loop(const rxsc::run_loop &rl)
Definition: rx-observe_on.hpp:318
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
coordinator< input_type > coordinator_type
Definition: rx-observe_on.hpp:306
Definition: rx-coordination.hpp:114
static operators::detail::observe_on_invalid_t< AN... > member(AN...)
Definition: rx-observe_on.hpp:253
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-observe_on.hpp:308
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45