RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-newthread.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
15 
17 {
18 private:
19  typedef new_thread this_type;
20  new_thread(const this_type&);
21 
22  struct new_worker : public worker_interface
23  {
24  private:
25  typedef new_worker this_type;
26 
27  typedef detail::action_queue queue_type;
28 
29  new_worker(const this_type&);
30 
31  struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
32  {
33  typedef detail::schedulable_queue<
34  typename clock_type::time_point> queue_item_time;
35 
36  typedef queue_item_time::item_type item_type;
37 
38  virtual ~new_worker_state()
39  {
40  std::unique_lock<std::mutex> guard(lock);
41  if (worker.joinable() && worker.get_id() != std::this_thread::get_id()) {
42  lifetime.unsubscribe();
43  guard.unlock();
44  worker.join();
45  }
46  else {
47  lifetime.unsubscribe();
48  worker.detach();
49  }
50  }
51 
52  explicit new_worker_state(composite_subscription cs)
53  : lifetime(cs)
54  {
55  }
56 
57  composite_subscription lifetime;
58  mutable std::mutex lock;
59  mutable std::condition_variable wake;
60  mutable queue_item_time q;
61  std::thread worker;
62  recursion r;
63  };
64 
65  std::shared_ptr<new_worker_state> state;
66 
67  public:
68  virtual ~new_worker()
69  {
70  }
71 
72  explicit new_worker(std::shared_ptr<new_worker_state> ws)
73  : state(ws)
74  {
75  }
76 
77  new_worker(composite_subscription cs, thread_factory& tf)
78  : state(std::make_shared<new_worker_state>(cs))
79  {
80  auto keepAlive = state;
81 
82  state->lifetime.add([keepAlive](){
83  std::unique_lock<std::mutex> guard(keepAlive->lock);
84  auto expired = std::move(keepAlive->q);
85  if (!keepAlive->q.empty()) std::terminate();
86  keepAlive->wake.notify_one();
87  });
88 
89  state->worker = tf([keepAlive](){
90 
91  // take ownership
92  queue_type::ensure(std::make_shared<new_worker>(keepAlive));
93  // release ownership
95  queue_type::destroy();
96  });
97 
98  for(;;) {
99  std::unique_lock<std::mutex> guard(keepAlive->lock);
100  if (keepAlive->q.empty()) {
101  keepAlive->wake.wait(guard, [keepAlive](){
102  return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
103  });
104  }
105  if (!keepAlive->lifetime.is_subscribed()) {
106  break;
107  }
108  auto& peek = keepAlive->q.top();
109  if (!peek.what.is_subscribed()) {
110  keepAlive->q.pop();
111  continue;
112  }
113  if (clock_type::now() < peek.when) {
114  keepAlive->wake.wait_until(guard, peek.when);
115  continue;
116  }
117  auto what = peek.what;
118  keepAlive->q.pop();
119  keepAlive->r.reset(keepAlive->q.empty());
120  guard.unlock();
121  what(keepAlive->r.get_recurse());
122  }
123  });
124  }
125 
126  virtual clock_type::time_point now() const {
127  return clock_type::now();
128  }
129 
130  virtual void schedule(const schedulable& scbl) const {
131  schedule(now(), scbl);
132  }
133 
134  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
135  if (scbl.is_subscribed()) {
136  std::unique_lock<std::mutex> guard(state->lock);
137  state->q.push(new_worker_state::item_type(when, scbl));
138  state->r.reset(false);
139  }
140  state->wake.notify_one();
141  }
142  };
143 
144  mutable thread_factory factory;
145 
146 public:
148  : factory([](std::function<void()> start){
149  return std::thread(std::move(start));
150  })
151  {
152  }
154  : factory(tf)
155  {
156  }
157  virtual ~new_thread()
158  {
159  }
160 
161  virtual clock_type::time_point now() const {
162  return clock_type::now();
163  }
164 
166  return worker(cs, std::make_shared<new_worker>(cs, factory));
167  }
168 };
169 
171  static scheduler instance = make_scheduler<new_thread>();
172  return instance;
173 }
175  return make_scheduler<new_thread>(tf);
176 }
177 
178 }
179 
180 }
181 
182 #endif
Definition: rx-scheduler.hpp:163
void unsubscribe() const
Definition: rx-scheduler.hpp:246
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
new_thread()
Definition: rx-newthread.hpp:147
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:157
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:153
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
virtual clock_type::time_point now() const
Definition: rx-newthread.hpp:161
virtual worker create_worker(composite_subscription cs) const
Definition: rx-newthread.hpp:165
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200