5 #if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP) 6 #define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
16 struct run_loop_state :
public std::enable_shared_from_this<run_loop_state>
20 typedef detail::schedulable_queue<
21 clock_type::time_point> queue_item_time;
23 typedef queue_item_time::item_type item_type;
24 typedef queue_item_time::const_reference const_reference_item_type;
26 virtual ~run_loop_state()
34 composite_subscription lifetime;
35 mutable std::mutex lock;
36 mutable queue_item_time q;
38 std::function<void(clock_type::time_point)> notify_earlier_wakeup;
53 typedef run_loop_worker this_type;
55 run_loop_worker(
const this_type&);
58 std::weak_ptr<detail::run_loop_state> state;
60 virtual ~run_loop_worker()
64 explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws)
69 virtual clock_type::time_point now()
const {
70 return clock_type::now();
73 virtual void schedule(
const schedulable& scbl)
const {
74 schedule(now(), scbl);
77 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
79 auto st = state.lock();
80 std::unique_lock<std::mutex> guard(st->lock);
81 const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
82 (st->q.empty() || when < st->q.top().when);
83 st->q.push(detail::run_loop_state::item_type(when, scbl));
85 if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
91 std::weak_ptr<detail::run_loop_state> state;
102 virtual clock_type::time_point
now()
const {
103 return clock_type::now();
107 auto lifetime = state.lock()->lifetime;
108 auto token = lifetime.
add(cs);
109 cs.
add([=](){lifetime.remove(token);});
110 return worker(cs, create_worker_interface());
114 return std::make_shared<run_loop_worker>(state);
127 typedef detail::action_queue queue_type;
129 typedef detail::run_loop_state::item_type item_type;
130 typedef detail::run_loop_state::const_reference_item_type const_reference_item_type;
132 std::shared_ptr<detail::run_loop_state> state;
133 std::shared_ptr<run_loop_scheduler> sc;
138 : state(std::make_shared<detail::run_loop_state>())
143 queue_type::ensure(sc->create_worker_interface());
147 state->lifetime.unsubscribe();
149 std::unique_lock<std::mutex> guard(state->lock);
152 queue_type::destroy();
154 auto expired = std::move(state->q);
155 if (!state->q.empty()) std::terminate();
158 clock_type::time_point
now()
const {
159 return clock_type::now();
163 return state->lifetime;
167 return state->q.empty();
170 const_reference_item_type
peek()
const {
171 return state->q.top();
175 std::unique_lock<std::mutex> guard(state->lock);
176 if (state->q.empty()) {
179 auto& peek = state->q.top();
180 if (!peek.what.is_subscribed()) {
184 if (clock_type::now() < peek.when) {
187 auto what = peek.what;
189 state->r.reset(state->q.empty());
191 what(state->r.get_recurse());
199 std::unique_lock<std::mutex> guard(state->lock);
200 state->notify_earlier_wakeup = f;
Definition: rx-scheduler.hpp:163
composite_subscription get_subscription() const
Definition: rx-runloop.hpp:162
virtual ~run_loop_scheduler()
Definition: rx-runloop.hpp:98
Definition: rx-all.hpp:26
scheduler get_scheduler() const
Definition: rx-runloop.hpp:194
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
virtual clock_type::time_point now() const
Definition: rx-runloop.hpp:102
Definition: rx-runloop.hpp:44
void dispatch() const
Definition: rx-runloop.hpp:174
scheduler::clock_type clock_type
Definition: rx-runloop.hpp:136
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_run_loop(const run_loop &r)
Definition: rx-runloop.hpp:204
const_reference_item_type peek() const
Definition: rx-runloop.hpp:170
std::shared_ptr< worker_interface > create_worker_interface() const
Definition: rx-runloop.hpp:113
~run_loop()
Definition: rx-runloop.hpp:145
scheduler make_scheduler(ArgN &&...an)
Definition: rx-scheduler.hpp:418
clock_type::time_point now() const
Definition: rx-runloop.hpp:158
Definition: rx-runloop.hpp:118
run_loop_scheduler(std::weak_ptr< detail::run_loop_state > ws)
Definition: rx-runloop.hpp:94
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
run_loop()
Definition: rx-runloop.hpp:137
Definition: rx-scheduler.hpp:353
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:389
bool empty() const
Definition: rx-runloop.hpp:166
void set_notify_earlier_wakeup(std::function< void(clock_type::time_point)> const &f)
Definition: rx-runloop.hpp:198
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
virtual worker create_worker(composite_subscription cs) const
Definition: rx-runloop.hpp:106
Definition: rx-scheduler.hpp:200