5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP) 6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
25 typedef new_worker this_type;
27 typedef detail::action_queue queue_type;
29 new_worker(
const this_type&);
31 struct new_worker_state :
public std::enable_shared_from_this<new_worker_state>
33 typedef detail::schedulable_queue<
34 typename clock_type::time_point> queue_item_time;
36 typedef queue_item_time::item_type item_type;
38 virtual ~new_worker_state()
40 std::unique_lock<std::mutex> guard(lock);
41 if (
worker.joinable() &&
worker.get_id() != std::this_thread::get_id()) {
42 lifetime.unsubscribe();
58 mutable std::mutex lock;
59 mutable std::condition_variable wake;
60 mutable queue_item_time q;
65 std::shared_ptr<new_worker_state> state;
72 explicit new_worker(std::shared_ptr<new_worker_state> ws)
78 : state(std::make_shared<new_worker_state>(cs))
80 auto keepAlive = state;
82 state->lifetime.add([keepAlive](){
83 std::unique_lock<std::mutex> guard(keepAlive->lock);
84 auto expired = std::move(keepAlive->q);
85 if (!keepAlive->q.empty()) std::terminate();
86 keepAlive->wake.notify_one();
89 state->worker = tf([keepAlive](){
92 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
95 queue_type::destroy();
99 std::unique_lock<std::mutex> guard(keepAlive->lock);
100 if (keepAlive->q.empty()) {
101 keepAlive->wake.wait(guard, [keepAlive](){
102 return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
105 if (!keepAlive->lifetime.is_subscribed()) {
108 auto& peek = keepAlive->q.top();
109 if (!peek.what.is_subscribed()) {
113 if (clock_type::now() < peek.when) {
114 keepAlive->wake.wait_until(guard, peek.when);
117 auto what = peek.what;
119 keepAlive->r.reset(keepAlive->q.empty());
121 what(keepAlive->r.get_recurse());
126 virtual clock_type::time_point
now()
const {
127 return clock_type::now();
130 virtual void schedule(
const schedulable& scbl)
const {
131 schedule(
now(), scbl);
134 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
136 std::unique_lock<std::mutex> guard(state->lock);
137 state->q.push(new_worker_state::item_type(when, scbl));
138 state->r.reset(
false);
140 state->wake.notify_one();
148 : factory([](std::function<void()> start){
149 return std::thread(std::move(start));
161 virtual clock_type::time_point
now()
const {
162 return clock_type::now();
166 return worker(cs, std::make_shared<new_worker>(cs, factory));
171 static scheduler instance = make_scheduler<new_thread>();
175 return make_scheduler<new_thread>(tf);
Definition: rx-scheduler.hpp:163
void unsubscribe() const
Definition: rx-scheduler.hpp:246
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
new_thread()
Definition: rx-newthread.hpp:147
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:157
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:153
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
virtual clock_type::time_point now() const
Definition: rx-newthread.hpp:161
virtual worker create_worker(composite_subscription cs) const
Definition: rx-newthread.hpp:165
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200