RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-test.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #if !defined(RXCPP_RX_TEST_HPP)
4 #define RXCPP_RX_TEST_HPP
5 
6 #include "rx-includes.hpp"
7 
8 namespace rxcpp {
9 
10 namespace test {
11 
12 namespace detail {
13 
14 template<class T>
15 struct test_subject_base
16  : public std::enable_shared_from_this<test_subject_base<T>>
17 {
19  typedef std::shared_ptr<test_subject_base<T>> type;
20 
21  virtual ~test_subject_base() {}
22  virtual void on_subscribe(subscriber<T>) const =0;
23  virtual std::vector<recorded_type> messages() const =0;
24  virtual std::vector<rxn::subscription> subscriptions() const =0;
25 };
26 
27 template<class T>
28 struct test_source
29  : public rxs::source_base<T>
30 {
31  explicit test_source(typename test_subject_base<T>::type ts)
32  : ts(std::move(ts))
33  {
34  if (!this->ts) std::terminate();
35  }
36  typename test_subject_base<T>::type ts;
37  void on_subscribe(subscriber<T> o) const {
38  ts->on_subscribe(std::move(o));
39  }
40  template<class Subscriber>
41  typename std::enable_if<!std::is_same<Subscriber, subscriber<T>>::value, void>::type
42  on_subscribe(Subscriber o) const {
43 
44  static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber.");
45 
46  ts->on_subscribe(o.as_dynamic());
47  }
48 };
49 
50 }
51 
52 template<class T>
54  : public observer<T>
55 {
56  typedef observer<T> observer_base;
57  typedef typename detail::test_subject_base<T>::type test_subject;
58  test_subject ts;
59 
60 public:
62 
63  testable_observer(test_subject ts, observer_base ob)
64  : observer_base(std::move(ob))
65  , ts(std::move(ts))
66  {
67  }
68 
69  std::vector<recorded_type> messages() const {
70  return ts->messages();
71  }
72 };
73 
74 //struct tag_test_observable : public tag_observable {};
75 
82 template<class T>
84  : public observable<T, typename detail::test_source<T>>
85 {
87  typedef typename detail::test_subject_base<T>::type test_subject;
88  test_subject ts;
89 
90  //typedef tag_test_observable observable_tag;
91 
92 public:
94 
95  explicit testable_observable(test_subject ts)
96  : observable_base(detail::test_source<T>(ts))
97  , ts(ts)
98  {
99  }
100 
101  std::vector<rxn::subscription> subscriptions() const {
102  return ts->subscriptions();
103  }
104 
105  std::vector<recorded_type> messages() const {
106  return ts->messages();
107  }
108 };
109 
110 }
111 namespace rxt=test;
112 
113 }
114 
115 //
116 // support range() >> filter() >> subscribe() syntax
117 // '>>' is spelled 'stream'
118 //
119 template<class T, class OperatorFactory>
120 auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
121  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
122  return source.op(std::forward<OperatorFactory>(of));
123 }
124 
125 //
126 // support range() | filter() | subscribe() syntax
127 // '|' is spelled 'pipe'
128 //
129 template<class T, class OperatorFactory>
130 auto operator | (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
131  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
132  return source.op(std::forward<OperatorFactory>(of));
133 }
134 
135 #include "schedulers/rx-test.hpp"
136 
137 #endif
Definition: rx-notification.hpp:253
Definition: rx-all.hpp:26
auto operator>>(const rxcpp::test::testable_observable< T > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-test.hpp:120
std::vector< recorded_type > messages() const
Definition: rx-test.hpp:69
auto operator|(const rxcpp::test::testable_observable< T > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-test.hpp:130
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
detail::test_subject_base< T >::recorded_type recorded_type
Definition: rx-test.hpp:93
testable_observer(test_subject ts, observer_base ob)
Definition: rx-test.hpp:63
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
testable_observable(test_subject ts)
Definition: rx-test.hpp:95
a source of values that records the time of each subscription/unsubscription and all the values and t...
Definition: rx-test.hpp:83
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
std::vector< recorded_type > messages() const
Definition: rx-test.hpp:105
Definition: rx-predef.hpp:115
Definition: rx-sources.hpp:17
Definition: rx-test.hpp:53
detail::test_subject_base< T >::recorded_type recorded_type
Definition: rx-test.hpp:61
std::vector< rxn::subscription > subscriptions() const
Definition: rx-test.hpp:101