5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)     6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP     8 #include "../rx-includes.hpp"    12 namespace schedulers {
    25         typedef new_worker this_type;
    27         typedef detail::action_queue queue_type;
    29         new_worker(
const this_type&);
    31         struct new_worker_state : 
public std::enable_shared_from_this<new_worker_state>
    33             typedef detail::schedulable_queue<
    34                 typename clock_type::time_point> queue_item_time;
    36             typedef queue_item_time::item_type item_type;
    38             virtual ~new_worker_state()
    40                 std::unique_lock<std::mutex> guard(lock);
    41                 if (
worker.joinable() && 
worker.get_id() != std::this_thread::get_id()) {
    42                     lifetime.unsubscribe();
    58             mutable std::mutex lock;
    59             mutable std::condition_variable wake;
    60             mutable queue_item_time q;
    65         std::shared_ptr<new_worker_state> state;
    72         explicit new_worker(std::shared_ptr<new_worker_state> ws)
    78             : state(std::make_shared<new_worker_state>(cs))
    80             auto keepAlive = state;
    82             state->lifetime.add([keepAlive](){
    83                 std::unique_lock<std::mutex> guard(keepAlive->lock);
    84                 auto expired = std::move(keepAlive->q);
    85                 if (!keepAlive->q.empty()) std::terminate();
    86                 keepAlive->wake.notify_one();
    89             state->worker = tf([keepAlive](){
    92                 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
    95                     queue_type::destroy();
    99                     std::unique_lock<std::mutex> guard(keepAlive->lock);
   100                     if (keepAlive->q.empty()) {
   101                         keepAlive->wake.wait(guard, [keepAlive](){
   102                             return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
   105                     if (!keepAlive->lifetime.is_subscribed()) {
   108                     auto& peek = keepAlive->q.top();
   109                     if (!peek.what.is_subscribed()) {
   113                     if (clock_type::now() < peek.when) {
   114                         keepAlive->wake.wait_until(guard, peek.when);
   117                     auto what = peek.what;
   119                     keepAlive->r.reset(keepAlive->q.empty());
   121                     what(keepAlive->r.get_recurse());
   126         virtual clock_type::time_point 
now()
 const {
   127             return clock_type::now();
   130         virtual void schedule(
const schedulable& scbl)
 const {
   131             schedule(
now(), scbl);
   134         virtual void schedule(clock_type::time_point when, 
const schedulable& scbl)
 const {
   136                 std::unique_lock<std::mutex> guard(state->lock);
   137                 state->q.push(new_worker_state::item_type(when, scbl));
   138                 state->r.reset(
false);
   140             state->wake.notify_one();
   148         : factory([](std::function<void()> start){
   149             return std::thread(std::move(start));
   161     virtual clock_type::time_point 
now()
 const {
   162         return clock_type::now();
   166         return worker(cs, std::make_shared<new_worker>(cs, factory));
   171     static scheduler instance = make_scheduler<new_thread>();
   175     return make_scheduler<new_thread>(tf);
 Definition: rx-scheduler.hpp:163
void unsubscribe() const 
Definition: rx-scheduler.hpp:246
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
bool is_subscribed() const 
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe. 
Definition: rx-subscription.hpp:364
new_thread()
Definition: rx-newthread.hpp:147
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts. 
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:157
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:153
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
virtual clock_type::time_point now() const 
Definition: rx-newthread.hpp:161
virtual worker create_worker(composite_subscription cs) const 
Definition: rx-newthread.hpp:165
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200