RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-currentthread.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 struct action_queue
17 {
18  typedef action_queue this_type;
19 
20  typedef scheduler_base::clock_type clock;
21  typedef time_schedulable<clock::time_point> item_type;
22 
23 private:
24  typedef schedulable_queue<item_type::time_point_type> queue_item_time;
25 
26 public:
27  struct current_thread_queue_type {
28  std::shared_ptr<worker_interface> w;
29  recursion r;
30  queue_item_time q;
31  };
32 
33 private:
34 #if defined(RXCPP_THREAD_LOCAL)
35  static current_thread_queue_type*& current_thread_queue() {
36  static RXCPP_THREAD_LOCAL current_thread_queue_type* q;
37  return q;
38  }
39 #else
40  static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() {
41  static rxu::thread_local_storage<current_thread_queue_type> q;
42  return q;
43  }
44 #endif
45 
46 public:
47 
48  static bool owned() {
49  return !!current_thread_queue();
50  }
51  static const std::shared_ptr<worker_interface>& get_worker_interface() {
52  return current_thread_queue()->w;
53  }
54  static recursion& get_recursion() {
55  return current_thread_queue()->r;
56  }
57  static bool empty() {
58  if (!current_thread_queue()) {
59  std::terminate();
60  }
61  return current_thread_queue()->q.empty();
62  }
63  static queue_item_time::const_reference top() {
64  if (!current_thread_queue()) {
65  std::terminate();
66  }
67  return current_thread_queue()->q.top();
68  }
69  static void pop() {
70  auto& state = current_thread_queue();
71  if (!state) {
72  std::terminate();
73  }
74  state->q.pop();
75  if (state->q.empty()) {
76  // allow recursion
77  state->r.reset(true);
78  }
79  }
80  static void push(item_type item) {
81  auto& state = current_thread_queue();
82  if (!state) {
83  std::terminate();
84  }
85  if (!item.what.is_subscribed()) {
86  return;
87  }
88  state->q.push(std::move(item));
89  // disallow recursion
90  state->r.reset(false);
91  }
92  static std::shared_ptr<worker_interface> ensure(std::shared_ptr<worker_interface> w) {
93  if (!!current_thread_queue()) {
94  std::terminate();
95  }
96  // create and publish new queue
97  current_thread_queue() = new current_thread_queue_type();
98  current_thread_queue()->w = w;
99  return w;
100  }
101  static std::unique_ptr<current_thread_queue_type> create(std::shared_ptr<worker_interface> w) {
102  std::unique_ptr<current_thread_queue_type> result(new current_thread_queue_type());
103  result->w = std::move(w);
104  return result;
105  }
106  static void set(current_thread_queue_type* q) {
107  if (!!current_thread_queue()) {
108  std::terminate();
109  }
110  // publish new queue
111  current_thread_queue() = q;
112  }
113  static void destroy(current_thread_queue_type* q) {
114  delete q;
115  }
116  static void destroy() {
117  if (!current_thread_queue()) {
118  std::terminate();
119  }
120 #if defined(RXCPP_THREAD_LOCAL)
121  destroy(current_thread_queue());
122 #else
123  destroy(current_thread_queue().get());
124 #endif
125  current_thread_queue() = nullptr;
126  }
127 };
128 
129 }
130 
132 {
133 private:
134  typedef current_thread this_type;
135  current_thread(const this_type&);
136 
137  typedef detail::action_queue queue_type;
138 
139  struct derecurser : public worker_interface
140  {
141  private:
142  typedef current_thread this_type;
143  derecurser(const this_type&);
144  public:
145  derecurser()
146  {
147  }
148  virtual ~derecurser()
149  {
150  }
151 
152  virtual clock_type::time_point now() const {
153  return clock_type::now();
154  }
155 
156  virtual void schedule(const schedulable& scbl) const {
157  queue_type::push(queue_type::item_type(now(), scbl));
158  }
159 
160  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
161  queue_type::push(queue_type::item_type(when, scbl));
162  }
163  };
164 
165  struct current_worker : public worker_interface
166  {
167  private:
168  typedef current_thread this_type;
169  current_worker(const this_type&);
170  public:
171  current_worker()
172  {
173  }
174  virtual ~current_worker()
175  {
176  }
177 
178  virtual clock_type::time_point now() const {
179  return clock_type::now();
180  }
181 
182  virtual void schedule(const schedulable& scbl) const {
183  schedule(now(), scbl);
184  }
185 
186  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
187  if (!scbl.is_subscribed()) {
188  return;
189  }
190 
191  {
192  // check ownership
193  if (queue_type::owned()) {
194  // already has an owner - delegate
195  queue_type::get_worker_interface()->schedule(when, scbl);
196  return;
197  }
198 
199  // take ownership
200  queue_type::ensure(std::make_shared<derecurser>());
201  }
202  // release ownership
204  queue_type::destroy();
205  });
206 
207  const auto& recursor = queue_type::get_recursion().get_recurse();
208  std::this_thread::sleep_until(when);
209  if (scbl.is_subscribed()) {
210  scbl(recursor);
211  }
212  if (queue_type::empty()) {
213  return;
214  }
215 
216  // loop until queue is empty
217  for (
218  auto next = queue_type::top().when;
219  (std::this_thread::sleep_until(next), true);
220  next = queue_type::top().when
221  ) {
222  auto what = queue_type::top().what;
223 
224  queue_type::pop();
225 
226  if (what.is_subscribed()) {
227  what(recursor);
228  }
229 
230  if (queue_type::empty()) {
231  break;
232  }
233  }
234  }
235  };
236 
237  std::shared_ptr<current_worker> wi;
238 
239 public:
241  : wi(std::make_shared<current_worker>())
242  {
243  }
244  virtual ~current_thread()
245  {
246  }
247 
248  static bool is_schedule_required() { return !queue_type::owned(); }
249 
250  inline bool is_tail_recursion_allowed() const {
251  return queue_type::empty();
252  }
253 
254  virtual clock_type::time_point now() const {
255  return clock_type::now();
256  }
257 
259  return worker(std::move(cs), wi);
260  }
261 };
262 
264  static scheduler instance = make_scheduler<current_thread>();
265  return instance;
266 }
267 
268 }
269 
270 }
271 
272 #endif
Definition: rx-scheduler.hpp:163
auto create(OnSubscribe os) -> observable< T, detail::create< T, OnSubscribe >>
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-create.hpp:82
Definition: rx-all.hpp:26
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
bool is_tail_recursion_allowed() const
Definition: rx-currentthread.hpp:250
virtual ~current_thread()
Definition: rx-currentthread.hpp:244
Definition: rx-currentthread.hpp:131
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual clock_type::time_point now() const
Definition: rx-currentthread.hpp:254
virtual worker create_worker(composite_subscription cs) const
Definition: rx-currentthread.hpp:258
std::chrono::steady_clock clock_type
Definition: rx-scheduler.hpp:154
static bool is_schedule_required()
Definition: rx-currentthread.hpp:248
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
current_thread()
Definition: rx-currentthread.hpp:240
Definition: rx-scheduler.hpp:353
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200