RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-runloop.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 struct run_loop_state : public std::enable_shared_from_this<run_loop_state>
17 {
18  typedef scheduler::clock_type clock_type;
19 
20  typedef detail::schedulable_queue<
21  clock_type::time_point> queue_item_time;
22 
23  typedef queue_item_time::item_type item_type;
24  typedef queue_item_time::const_reference const_reference_item_type;
25 
26  virtual ~run_loop_state()
27  {
28  }
29 
30  run_loop_state()
31  {
32  }
33 
34  composite_subscription lifetime;
35  mutable std::mutex lock;
36  mutable queue_item_time q;
37  recursion r;
38  std::function<void(clock_type::time_point)> notify_earlier_wakeup;
39 };
40 
41 }
42 
43 
45 {
46 private:
48  run_loop_scheduler(const this_type&);
49 
50  struct run_loop_worker : public worker_interface
51  {
52  private:
53  typedef run_loop_worker this_type;
54 
55  run_loop_worker(const this_type&);
56 
57  public:
58  std::weak_ptr<detail::run_loop_state> state;
59 
60  virtual ~run_loop_worker()
61  {
62  }
63 
64  explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws)
65  : state(ws)
66  {
67  }
68 
69  virtual clock_type::time_point now() const {
70  return clock_type::now();
71  }
72 
73  virtual void schedule(const schedulable& scbl) const {
74  schedule(now(), scbl);
75  }
76 
77  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
78  if (scbl.is_subscribed()) {
79  auto st = state.lock();
80  std::unique_lock<std::mutex> guard(st->lock);
81  const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
82  (st->q.empty() || when < st->q.top().when);
83  st->q.push(detail::run_loop_state::item_type(when, scbl));
84  st->r.reset(false);
85  if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
86  guard.unlock(); // So we can't get attempt to recursively lock the state
87  }
88  }
89  };
90 
91  std::weak_ptr<detail::run_loop_state> state;
92 
93 public:
94  explicit run_loop_scheduler(std::weak_ptr<detail::run_loop_state> ws)
95  : state(ws)
96  {
97  }
99  {
100  }
101 
102  virtual clock_type::time_point now() const {
103  return clock_type::now();
104  }
105 
107  auto lifetime = state.lock()->lifetime;
108  auto token = lifetime.add(cs);
109  cs.add([=](){lifetime.remove(token);});
110  return worker(cs, create_worker_interface());
111  }
112 
113  std::shared_ptr<worker_interface> create_worker_interface() const {
114  return std::make_shared<run_loop_worker>(state);
115  }
116 };
117 
118 class run_loop
119 {
120 private:
121  typedef run_loop this_type;
122  // don't allow this instance to copy/move since it owns current_thread queue
123  // for the thread it is constructed on.
124  run_loop(const this_type&);
125  run_loop(this_type&&);
126 
127  typedef detail::action_queue queue_type;
128 
129  typedef detail::run_loop_state::item_type item_type;
130  typedef detail::run_loop_state::const_reference_item_type const_reference_item_type;
131 
132  std::shared_ptr<detail::run_loop_state> state;
133  std::shared_ptr<run_loop_scheduler> sc;
134 
135 public:
138  : state(std::make_shared<detail::run_loop_state>())
139  , sc(std::make_shared<run_loop_scheduler>(state))
140  {
141  // take ownership so that the current_thread scheduler
142  // uses the same queue on this thread
143  queue_type::ensure(sc->create_worker_interface());
144  }
146  {
147  state->lifetime.unsubscribe();
148 
149  std::unique_lock<std::mutex> guard(state->lock);
150 
151  // release ownership
152  queue_type::destroy();
153 
154  auto expired = std::move(state->q);
155  if (!state->q.empty()) std::terminate();
156  }
157 
158  clock_type::time_point now() const {
159  return clock_type::now();
160  }
161 
163  return state->lifetime;
164  }
165 
166  bool empty() const {
167  return state->q.empty();
168  }
169 
170  const_reference_item_type peek() const {
171  return state->q.top();
172  }
173 
174  void dispatch() const {
175  std::unique_lock<std::mutex> guard(state->lock);
176  if (state->q.empty()) {
177  return;
178  }
179  auto& peek = state->q.top();
180  if (!peek.what.is_subscribed()) {
181  state->q.pop();
182  return;
183  }
184  if (clock_type::now() < peek.when) {
185  return;
186  }
187  auto what = peek.what;
188  state->q.pop();
189  state->r.reset(state->q.empty());
190  guard.unlock();
191  what(state->r.get_recurse());
192  }
193 
195  return make_scheduler(sc);
196  }
197 
198  void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) {
199  std::unique_lock<std::mutex> guard(state->lock);
200  state->notify_earlier_wakeup = f;
201  }
202 };
203 
204 inline scheduler make_run_loop(const run_loop& r) {
205  return r.get_scheduler();
206 }
207 
208 }
209 
210 }
211 
212 #endif
Definition: rx-scheduler.hpp:163
composite_subscription get_subscription() const
Definition: rx-runloop.hpp:162
virtual ~run_loop_scheduler()
Definition: rx-runloop.hpp:98
Definition: rx-all.hpp:26
scheduler get_scheduler() const
Definition: rx-runloop.hpp:194
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
virtual clock_type::time_point now() const
Definition: rx-runloop.hpp:102
Definition: rx-runloop.hpp:44
void dispatch() const
Definition: rx-runloop.hpp:174
scheduler::clock_type clock_type
Definition: rx-runloop.hpp:136
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_run_loop(const run_loop &r)
Definition: rx-runloop.hpp:204
const_reference_item_type peek() const
Definition: rx-runloop.hpp:170
std::shared_ptr< worker_interface > create_worker_interface() const
Definition: rx-runloop.hpp:113
~run_loop()
Definition: rx-runloop.hpp:145
scheduler make_scheduler(ArgN &&...an)
Definition: rx-scheduler.hpp:418
clock_type::time_point now() const
Definition: rx-runloop.hpp:158
Definition: rx-runloop.hpp:118
run_loop_scheduler(std::weak_ptr< detail::run_loop_state > ws)
Definition: rx-runloop.hpp:94
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
run_loop()
Definition: rx-runloop.hpp:137
Definition: rx-scheduler.hpp:353
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:389
bool empty() const
Definition: rx-runloop.hpp:166
void set_notify_earlier_wakeup(std::function< void(clock_type::time_point)> const &f)
Definition: rx-runloop.hpp:198
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
virtual worker create_worker(composite_subscription cs) const
Definition: rx-runloop.hpp:106
Definition: rx-scheduler.hpp:200