RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-virtualtime.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP)
6 #define RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 template<class Absolute, class Relative>
17 struct virtual_time_base : std::enable_shared_from_this<virtual_time_base<Absolute, Relative>>
18 {
19 private:
20  typedef virtual_time_base<Absolute, Relative> this_type;
21  virtual_time_base(const virtual_time_base&);
22 
23  mutable bool isenabled;
24 
25 public:
26  typedef Absolute absolute;
27  typedef Relative relative;
28 
29  virtual ~virtual_time_base()
30  {
31  }
32 
33 protected:
34  virtual_time_base()
35  : isenabled(false)
36  , clock_now(0)
37  {
38  }
39  explicit virtual_time_base(absolute initialClock)
40  : isenabled(false)
41  , clock_now(initialClock)
42  {
43  }
44 
45  mutable absolute clock_now;
46 
47  typedef time_schedulable<long> item_type;
48 
49  virtual absolute add(absolute, relative) const =0;
50 
51  virtual typename scheduler_base::clock_type::time_point to_time_point(absolute) const =0;
52  virtual relative to_relative(typename scheduler_base::clock_type::duration) const =0;
53 
54  virtual item_type top() const =0;
55  virtual void pop() const =0;
56  virtual bool empty() const =0;
57 
58 public:
59 
60  virtual void schedule_absolute(absolute, const schedulable&) const =0;
61 
62  virtual void schedule_relative(relative when, const schedulable& a) const {
63  auto at = add(clock_now, when);
64  return schedule_absolute(at, a);
65  }
66 
67  bool is_enabled() const {return isenabled;}
68  absolute clock() const {return clock_now;}
69 
70  void start() const
71  {
72  if (!isenabled) {
73  isenabled = true;
74  rxsc::recursion r;
75  r.reset(false);
76  while (!empty() && isenabled) {
77  auto next = top();
78  pop();
79  if (next.what.is_subscribed()) {
80  if (next.when > clock_now) {
81  clock_now = next.when;
82  }
83  next.what(r.get_recurse());
84  }
85  }
86  isenabled = false;
87  }
88  }
89 
90  void stop() const
91  {
92  isenabled = false;
93  }
94 
95  void advance_to(absolute time) const
96  {
97  if (time < clock_now) {
98  std::terminate();
99  }
100 
101  if (time == clock_now) {
102  return;
103  }
104 
105  if (!isenabled) {
106  isenabled = true;
107  rxsc::recursion r;
108  while (!empty() && isenabled) {
109  auto next = top();
110  if (next.when <= time) {
111  pop();
112  if (!next.what.is_subscribed()) {
113  continue;
114  }
115  if (next.when > clock_now) {
116  clock_now = next.when;
117  }
118  next.what(r.get_recurse());
119  }
120  else {
121  break;
122  }
123  }
124  isenabled = false;
125  clock_now = time;
126  }
127  else {
128  std::terminate();
129  }
130  }
131 
132  void advance_by(relative time) const
133  {
134  auto dt = add(clock_now, time);
135 
136  if (dt < clock_now) {
137  std::terminate();
138  }
139 
140  if (dt == clock_now) {
141  return;
142  }
143 
144  if (!isenabled) {
145  advance_to(dt);
146  }
147  else {
148  std::terminate();
149  }
150  }
151 
152  void sleep(relative time) const
153  {
154  auto dt = add(clock_now, time);
155 
156  if (dt < clock_now) {
157  std::terminate();
158  }
159 
160  clock_now = dt;
161  }
162 
163 };
164 
165 }
166 
167 template<class Absolute, class Relative>
168 struct virtual_time : public detail::virtual_time_base<Absolute, Relative>
169 {
170  typedef detail::virtual_time_base<Absolute, Relative> base;
171 
172  typedef typename base::item_type item_type;
173 
174  typedef detail::schedulable_queue<
175  typename item_type::time_point_type> queue_item_time;
176 
177  mutable queue_item_time q;
178 
179 public:
180  virtual ~virtual_time()
181  {
182  }
183 
184 protected:
186  {
187  }
188  explicit virtual_time(typename base::absolute initialClock)
189  : base(initialClock)
190  {
191  }
192 
193  virtual item_type top() const {
194  return q.top();
195  }
196  virtual void pop() const {
197  q.pop();
198  }
199  virtual bool empty() const {
200  return q.empty();
201  }
202 
203  using base::schedule_absolute;
204  using base::schedule_relative;
205 
206  virtual void schedule_absolute(typename base::absolute when, const schedulable& a) const
207  {
208  // use a separate subscription here so that a's subscription is not affected
209  auto run = make_schedulable(
210  a.get_worker(),
212  [a](const schedulable& scbl) {
213  rxsc::recursion r;
214  r.reset(false);
215  if (scbl.is_subscribed()) {
216  scbl.unsubscribe(); // unsubscribe() run, not a;
217  a(r.get_recurse());
218  }
219  });
220  q.push(item_type(when, run));
221  }
222 };
223 
224 
225 
226 }
227 
228 }
229 
230 #endif
void reset(bool b=true) const
set whether tail-recursion is allowed
Definition: rx-scheduler.hpp:112
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
virtual ~virtual_time()
Definition: rx-virtualtime.hpp:180
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
virtual_time()
Definition: rx-virtualtime.hpp:185
base::item_type item_type
Definition: rx-virtualtime.hpp:172
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
virtual void pop() const
Definition: rx-virtualtime.hpp:196
virtual bool empty() const
Definition: rx-virtualtime.hpp:199
detail::virtual_time_base< Absolute, Relative > base
Definition: rx-virtualtime.hpp:170
queue_item_time q
Definition: rx-virtualtime.hpp:177
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
virtual_time(typename base::absolute initialClock)
Definition: rx-virtualtime.hpp:188
virtual item_type top() const
Definition: rx-virtualtime.hpp:193
const recurse & get_recurse() const
get the recurse to pass into each action being called
Definition: rx-scheduler.hpp:116
detail::schedulable_queue< typename item_type::time_point_type > queue_item_time
Definition: rx-virtualtime.hpp:175
const worker get_worker() const
Definition: rx-scheduler.hpp:544
virtual void schedule_absolute(typename base::absolute when, const schedulable &a) const
Definition: rx-virtualtime.hpp:206
Definition: rx-virtualtime.hpp:168
Definition: rx-scheduler.hpp:426