RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-test.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 class test_type : public scheduler_interface
17 {
18 public:
19 
20  typedef scheduler_interface::clock_type clock_type;
21 
22  struct test_type_state : public virtual_time<long, long>
23  {
24  typedef virtual_time<long, long> base;
25 
27  using base::schedule_relative;
28 
29  clock_type::time_point now() const {
30  return to_time_point(clock_now);
31  }
32 
33  virtual void schedule_absolute(long when, const schedulable& a) const
34  {
35  if (when <= base::clock_now)
36  when = base::clock_now + 1;
37 
38  return base::schedule_absolute(when, a);
39  }
40 
41  virtual long add(long absolute, long relative) const
42  {
43  return absolute + relative;
44  }
45 
46  virtual clock_type::time_point to_time_point(long absolute) const
47  {
48  return clock_type::time_point(std::chrono::milliseconds(absolute));
49  }
50 
51  virtual long to_relative(clock_type::duration d) const
52  {
53  return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
54  }
55  };
56 
57 private:
58  mutable std::shared_ptr<test_type_state> state;
59 
60 public:
61  struct test_type_worker : public worker_interface
62  {
63  mutable std::shared_ptr<test_type_state> state;
64 
65  typedef test_type_state::absolute absolute;
66  typedef test_type_state::relative relative;
67 
68  test_type_worker(std::shared_ptr<test_type_state> st)
69  : state(std::move(st))
70  {
71  }
72 
73  virtual clock_type::time_point now() const {
74  return state->now();
75  }
76 
77  virtual void schedule(const schedulable& scbl) const {
78  state->schedule_absolute(state->clock(), scbl);
79  }
80 
81  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
82  state->schedule_relative(state->to_relative(when - now()), scbl);
83  }
84 
85  void schedule_absolute(absolute when, const schedulable& scbl) const {
86  state->schedule_absolute(when, scbl);
87  }
88 
89  void schedule_relative(relative when, const schedulable& scbl) const {
90  state->schedule_relative(when, scbl);
91  }
92 
93  bool is_enabled() const {return state->is_enabled();}
94  absolute clock() const {return state->clock();}
95 
96  void start() const
97  {
98  state->start();
99  }
100 
101  void stop() const
102  {
103  state->stop();
104  }
105 
106  void advance_to(absolute time) const
107  {
108  state->advance_to(time);
109  }
110 
111  void advance_by(relative time) const
112  {
113  state->advance_by(time);
114  }
115 
116  void sleep(relative time) const
117  {
118  state->sleep(time);
119  }
120 
121  template<class T>
122  subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
123  };
124 
125 public:
126  test_type()
127  : state(std::make_shared<test_type_state>())
128  {
129  }
130 
131  virtual clock_type::time_point now() const {
132  return state->now();
133  }
134 
135  virtual worker create_worker(composite_subscription cs) const {
136  return worker(cs, std::make_shared<test_type_worker>(state));
137  }
138 
139  bool is_enabled() const {return state->is_enabled();}
140  long clock() {
141  return state->clock();
142  }
143 
144  clock_type::time_point to_time_point(long absolute) const {
145  return state->to_time_point(absolute);
146  }
147 
148  std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
149  return std::make_shared<test_type_worker>(state);
150  }
151 
152  template<class T>
153  rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
154 
155  template<class T>
156  rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
157 };
158 
159 template<class T>
160 class mock_observer
161  : public rxt::detail::test_subject_base<T>
162 {
163  typedef typename rxn::notification<T> notification_type;
164  typedef rxn::recorded<typename notification_type::type> recorded_type;
165 
166 public:
167  explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
168  : sc(sc)
169  {
170  }
171 
172  std::shared_ptr<test_type::test_type_state> sc;
173  std::vector<recorded_type> m;
174 
175  virtual void on_subscribe(subscriber<T>) const {
176  std::terminate();
177  }
178  virtual std::vector<rxn::subscription> subscriptions() const {
179  std::terminate();
180  }
181 
182  virtual std::vector<recorded_type> messages() const {
183  return m;
184  }
185 };
186 
187 template<class T>
188 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
189 {
190  typedef typename rxn::notification<T> notification_type;
191  typedef rxn::recorded<typename notification_type::type> recorded_type;
192 
193  auto ts = std::make_shared<mock_observer<T>>(state);
194 
195  return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
196  // on_next
197  [ts](T value)
198  {
199  ts->m.push_back(
200  recorded_type(ts->sc->clock(), notification_type::on_next(value)));
201  },
202  // on_error
203  [ts](std::exception_ptr e)
204  {
205  ts->m.push_back(
206  recorded_type(ts->sc->clock(), notification_type::on_error(e)));
207  },
208  // on_completed
209  [ts]()
210  {
211  ts->m.push_back(
212  recorded_type(ts->sc->clock(), notification_type::on_completed()));
213  })));
214 }
215 
216 template<class T>
217 class cold_observable
218  : public rxt::detail::test_subject_base<T>
219 {
220  typedef cold_observable<T> this_type;
221  std::shared_ptr<test_type::test_type_state> sc;
222  typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223  mutable std::vector<recorded_type> mv;
224  mutable std::vector<rxn::subscription> sv;
225  mutable worker controller;
226 
227 public:
228 
229  cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
230  : sc(sc)
231  , mv(std::move(mv))
232  , controller(w)
233  {
234  }
235 
236  template<class Iterator>
237  cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
238  : sc(sc)
239  , mv(begin, end)
240  , controller(w)
241  {
242  }
243 
244  virtual void on_subscribe(subscriber<T> o) const {
245  sv.push_back(rxn::subscription(sc->clock()));
246  auto index = sv.size() - 1;
247 
248  for (auto& message : mv) {
249  auto n = message.value();
250  sc->schedule_relative(message.time(), make_schedulable(
251  controller,
252  [n, o](const schedulable&) {
253  if (o.is_subscribed()) {
254  n->accept(o);
255  }
256  }));
257  }
258 
259  auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
260  o.add([sharedThis, index]() {
261  sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
262  });
263  }
264 
265  virtual std::vector<rxn::subscription> subscriptions() const {
266  return sv;
267  }
268 
269  virtual std::vector<recorded_type> messages() const {
270  return mv;
271  }
272 };
273 
274 template<class T>
275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
276 {
277  auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278  return rxt::testable_observable<T>(co);
279 }
280 
281 template<class T>
282 class hot_observable
283  : public rxt::detail::test_subject_base<T>
284 {
285  typedef hot_observable<T> this_type;
286  std::shared_ptr<test_type::test_type_state> sc;
287  typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288  typedef subscriber<T> observer_type;
289  mutable std::vector<recorded_type> mv;
290  mutable std::vector<rxn::subscription> sv;
291  mutable std::list<observer_type> observers;
292  mutable worker controller;
293 
294 public:
295 
296  hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
297  : sc(sc)
298  , mv(mv)
299  , controller(w)
300  {
301  for (auto& message : mv) {
302  auto n = message.value();
303  sc->schedule_absolute(message.time(), make_schedulable(
304  controller,
305  [this, n](const schedulable&) {
306  auto local = this->observers;
307  for (auto& o : local) {
308  if (o.is_subscribed()) {
309  n->accept(o);
310  }
311  }
312  }));
313  }
314  }
315 
316  virtual ~hot_observable() {}
317 
318  virtual void on_subscribe(observer_type o) const {
319  auto olocation = observers.insert(observers.end(), o);
320 
321  sv.push_back(rxn::subscription(sc->clock()));
322  auto index = sv.size() - 1;
323 
324  auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
325  o.add([sharedThis, index, olocation]() {
326  sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327  sharedThis->observers.erase(olocation);
328  });
329  }
330 
331  virtual std::vector<rxn::subscription> subscriptions() const {
332  return sv;
333  }
334 
335  virtual std::vector<recorded_type> messages() const {
336  return mv;
337  }
338 };
339 
340 template<class T>
341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
342 {
343  auto worker = create_worker(composite_subscription());
344  auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages));
345  return rxt::testable_observable<T>(shared);
346 }
347 
348 template<class F>
349 struct is_create_source_function
350 {
351  struct not_void {};
352  template<class CF>
353  static auto check(int) -> decltype((*(CF*)nullptr)());
354  template<class CF>
355  static not_void check(...);
356 
357  static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
358 };
359 
360 }
361 
362 class test : public scheduler
363 {
364  std::shared_ptr<detail::test_type> tester;
365 public:
366 
367  explicit test(std::shared_ptr<detail::test_type> t)
368  : scheduler(std::static_pointer_cast<scheduler_interface>(t))
369  , tester(t)
370  {
371  }
372 
373  typedef detail::test_type::clock_type clock_type;
374 
375  static const long created_time = 100;
376  static const long subscribed_time = 200;
377  static const long unsubscribed_time = 1000;
378 
379  template<class T>
380  struct messages
381  {
385 
386  messages() {}
387 
388  template<typename U>
389  static recorded_type next(long ticks, U value) {
390  return recorded_type(ticks, notification_type::on_next(std::move(value)));
391  }
392 
393  static recorded_type completed(long ticks) {
394  return recorded_type(ticks, notification_type::on_completed());
395  }
396 
397  template<typename Exception>
398  static recorded_type error(long ticks, Exception&& e) {
399  return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
400  }
401 
402  static rxn::subscription subscribe(long subscribe, long unsubscribe) {
403  return rxn::subscription(subscribe, unsubscribe);
404  }
405  };
406 
407  class test_worker : public worker
408  {
409  std::shared_ptr<detail::test_type::test_type_worker> tester;
410  public:
411 
413  }
414 
415  explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
416  : worker(cs, std::static_pointer_cast<worker_interface>(t))
417  , tester(t)
418  {
419  }
420 
421  bool is_enabled() const {return tester->is_enabled();}
422  long clock() const {return tester->clock();}
423 
424  void schedule_absolute(long when, const schedulable& a) const {
425  tester->schedule_absolute(when, a);
426  }
427 
428  void schedule_relative(long when, const schedulable& a) const {
429  tester->schedule_relative(when, a);
430  }
431 
432  template<class Arg0, class... ArgN>
433  auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
434  -> typename std::enable_if<
435  (detail::is_action_function<Arg0>::value ||
436  is_subscription<Arg0>::value) &&
437  !is_schedulable<Arg0>::value>::type {
438  tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
439  }
440 
441  template<class Arg0, class... ArgN>
442  auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
443  -> typename std::enable_if<
444  (detail::is_action_function<Arg0>::value ||
445  is_subscription<Arg0>::value) &&
446  !is_schedulable<Arg0>::value>::type {
447  tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
448  }
449 
450  void advance_to(long time) const
451  {
452  tester->advance_to(time);
453  }
454 
455  void advance_by(long time) const
456  {
457  tester->advance_by(time);
458  }
459 
460  void sleep(long time) const
461  {
462  tester->sleep(time);
463  }
464 
465  template<class T, class F>
466  auto start(F createSource, long created, long subscribed, long unsubscribed) const
468  {
469  struct state_type
470  : public std::enable_shared_from_this<state_type>
471  {
472  typedef decltype(createSource()) source_type;
473 
474  std::unique_ptr<source_type> source;
476 
477  explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
478  : source()
479  , o(o)
480  {
481  }
482  };
483  auto state = std::make_shared<state_type>(this->make_subscriber<T>());
484 
485  schedule_absolute(created, [createSource, state](const schedulable&) {
486  state->source.reset(new typename state_type::source_type(createSource()));
487  });
488  schedule_absolute(subscribed, [state](const schedulable&) {
489  state->source->subscribe(state->o);
490  });
491  schedule_absolute(unsubscribed, [state](const schedulable&) {
492  state->o.unsubscribe();
493  });
494 
495  tester->start();
496 
497  return state->o;
498  }
499 
500  template<class T, class F>
501  auto start(F&& createSource, long unsubscribed) const
503  {
504  return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
505  }
506 
507  template<class T, class F>
508  auto start(F&& createSource) const
509  -> subscriber<T, rxt::testable_observer<T>>
510  {
511  return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
512  }
513 
514  template<class F>
516  {
517  typedef decltype((*(F*)nullptr)()) source_type;
518  typedef typename source_type::value_type value_type;
519  typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
520  };
521 
522  template<class F>
523  auto start(F createSource, long created, long subscribed, long unsubscribed) const
524  -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
525  {
526  return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
527  }
528 
529  template<class F>
530  auto start(F createSource, long unsubscribed) const
531  -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
532  {
533  return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed);
534  }
535 
536  template<class F>
537  auto start(F createSource) const
538  -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
539  {
540  return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
541  }
542 
543  void start() const {
544  tester->start();
545  }
546 
547  template<class T>
549  return tester->make_subscriber<T>();
550  }
551  };
552 
553  clock_type::time_point now() const {
554  return tester->now();
555  }
556 
558  return test_worker(cs, tester->create_test_type_worker_interface());
559  }
560 
561  bool is_enabled() const {return tester->is_enabled();}
562  long clock() const {return tester->clock();}
563 
564  clock_type::time_point to_time_point(long absolute) const {
565  return tester->to_time_point(absolute);
566  }
567 
568  template<class T>
569  rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
570  return tester->make_hot_observable(std::move(messages));
571  }
572 
573  template<class T, std::size_t size>
574  auto make_hot_observable(const T (&arr) [size]) const
575  -> decltype(tester->make_hot_observable(std::vector<T>())) {
576  return tester->make_hot_observable(rxu::to_vector(arr));
577  }
578 
579  template<class T>
580  auto make_hot_observable(std::initializer_list<T> il) const
581  -> decltype(tester->make_hot_observable(std::vector<T>())) {
582  return tester->make_hot_observable(std::vector<T>(il));
583  }
584 
585  template<class T>
586  rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
587  return tester->make_cold_observable(std::move(messages));
588  }
589 
590  template<class T, std::size_t size>
591  auto make_cold_observable(const T (&arr) [size]) const
592  -> decltype(tester->make_cold_observable(std::vector<T>())) {
593  return tester->make_cold_observable(rxu::to_vector(arr));
594  }
595 
596  template<class T>
597  auto make_cold_observable(std::initializer_list<T> il) const
598  -> decltype(tester->make_cold_observable(std::vector<T>())) {
599  return tester->make_cold_observable(std::vector<T>(il));
600  }
601 };
602 
603 
604 inline test make_test() {
605  return test(std::make_shared<detail::test_type>());
606 }
607 
608 }
609 
612  return r;
613 }
614 
615 }
616 
617 #endif
Definition: rx-notification.hpp:253
Definition: rx-scheduler.hpp:163
static recorded_type completed(long ticks)
Definition: rx-test.hpp:393
auto start(F createSource, long unsubscribed) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:530
detail::test_type::clock_type clock_type
Definition: rx-test.hpp:373
clock_type::time_point to_time_point(long absolute) const
Definition: rx-test.hpp:564
void schedule_absolute(long when, const schedulable &a) const
Definition: rx-test.hpp:424
auto make_cold_observable(const T(&arr)[size]) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:591
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
auto start(F createSource) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:537
~test_worker()
Definition: rx-test.hpp:412
long clock() const
Definition: rx-test.hpp:562
test(std::shared_ptr< detail::test_type > t)
Definition: rx-test.hpp:367
static recorded_type next(long ticks, U value)
Definition: rx-test.hpp:389
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
auto start(F createSource, long created, long subscribed, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:466
static rxn::subscription subscribe(long subscribe, long unsubscribe)
Definition: rx-test.hpp:402
rxn::subscription subscription_type
Definition: rx-test.hpp:384
Definition: rx-notification.hpp:116
rxn::notification< T > notification_type
Definition: rx-test.hpp:382
Definition: rx-subscription.hpp:31
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
rxt::testable_observable< T > make_cold_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:586
source_type::value_type value_type
Definition: rx-test.hpp:518
Definition: rx-predef.hpp:58
void advance_by(long time) const
Definition: rx-test.hpp:455
clock_type::time_point now() const
Definition: rx-test.hpp:553
subscriber< T, rxt::testable_observer< T > > make_subscriber() const
Definition: rx-test.hpp:548
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
auto make_hot_observable(std::initializer_list< T > il) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:580
bool is_enabled() const
Definition: rx-test.hpp:421
Definition: rx-notification.hpp:14
Definition: rx-test.hpp:407
void sleep(long time) const
Definition: rx-test.hpp:460
static recorded_type error(long ticks, Exception &&e)
Definition: rx-test.hpp:398
auto schedule_absolute(long when, Arg0 &&a0, ArgN &&...an) const -> typename std::enable_if< (detail::is_action_function< Arg0 >::value|| is_subscription< Arg0 >::value)&& !is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:433
auto start(F &&createSource) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:508
auto schedule_relative(long when, Arg0 &&a0, ArgN &&...an) const -> typename std::enable_if< (detail::is_action_function< Arg0 >::value|| is_subscription< Arg0 >::value)&& !is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:442
void schedule_relative(long when, const schedulable &a) const
Definition: rx-test.hpp:428
auto make_cold_observable(std::initializer_list< T > il) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:597
auto start(F &&createSource, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:501
test_worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-test.hpp:557
rxn::recorded< typename notification_type::type > recorded_type
Definition: rx-test.hpp:383
void start() const
Definition: rx-test.hpp:543
Definition: rx-scheduler.hpp:353
long clock() const
Definition: rx-test.hpp:422
bool is_enabled() const
Definition: rx-test.hpp:561
identity_one_worker identity_test()
Definition: rx-test.hpp:610
messages()
Definition: rx-test.hpp:386
a source of values that records the time of each subscription/unsubscription and all the values and t...
Definition: rx-test.hpp:83
rxt::testable_observable< T > make_hot_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:569
Definition: rx-coordination.hpp:114
Definition: rx-test.hpp:362
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
virtual void schedule_absolute(typename base::absolute when, const schedulable &a) const
Definition: rx-virtualtime.hpp:206
test_worker(composite_subscription cs, std::shared_ptr< detail::test_type::test_type_worker > t)
Definition: rx-test.hpp:415
Definition: rx-test.hpp:380
test make_test()
Definition: rx-test.hpp:604
Definition: rx-scheduler.hpp:426
std::vector< T > to_vector(const T(&arr)[size])
Definition: rx-util.hpp:40
Definition: rx-test.hpp:53
auto make_hot_observable(const T(&arr)[size]) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:574
auto subscribe(ArgN &&...an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
void advance_to(long time) const
Definition: rx-test.hpp:450
Definition: rx-scheduler.hpp:200