RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-notification.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_NOTIFICATION_HPP)
6 #define RXCPP_RX_NOTIFICATION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace notifications {
13 
15 {
16  long s;
17  long u;
18 
19 public:
20  explicit inline subscription(long s)
21  : s(s), u(std::numeric_limits<long>::max()) {
22  }
23  inline subscription(long s, long u)
24  : s(s), u(u) {
25  }
26  inline long subscribe() const {
27  return s;
28  }
29  inline long unsubscribe() const {
30  return u;
31  }
32 };
33 
34 inline bool operator == (subscription lhs, subscription rhs) {
35  return lhs.subscribe() == rhs.subscribe() && lhs.unsubscribe() == rhs.unsubscribe();
36 }
37 
38 inline std::ostream& operator<< (std::ostream& out, const subscription& s) {
39  out << s.subscribe() << "-" << s.unsubscribe();
40  return out;
41 }
42 
43 namespace detail {
44 
45 template<typename T>
46 struct notification_base
47  : public std::enable_shared_from_this<notification_base<T>>
48 {
49  typedef subscriber<T> observer_type;
50  typedef std::shared_ptr<notification_base<T>> type;
51 
52  virtual ~notification_base() {}
53 
54  virtual void out(std::ostream& out) const =0;
55  virtual bool equals(const type& other) const = 0;
56  virtual void accept(const observer_type& o) const =0;
57 };
58 
59 template<class T>
60 std::ostream& operator<< (std::ostream& out, const std::vector<T>& v);
61 
62 template<class T>
63 auto to_stream(std::ostream& os, const T& t, int, int)
64  -> decltype(os << t) {
65  return os << t;
66 }
67 
68 #if RXCPP_USE_RTTI
69 template<class T>
70 std::ostream& to_stream(std::ostream& os, const T&, int, ...) {
71  return os << "< " << typeid(T).name() << " does not support ostream>";
72 }
73 #endif
74 
75 template<class T>
76 std::ostream& to_stream(std::ostream& os, const T&, ...) {
77  return os << "<the value does not support ostream>";
78 }
79 
80 template<class T>
81 inline std::ostream& ostreamvector (std::ostream& os, const std::vector<T>& v) {
82  os << "[";
83  bool doemit = false;
84  for(auto& i : v) {
85  if (doemit) {
86  os << ", ";
87  } else {
88  doemit = true;
89  }
90  to_stream(os, i, 0, 0);
91  }
92  os << "]";
93  return os;
94 }
95 
96 template<class T>
97 inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) {
98  return ostreamvector(os, v);
99 }
100 
101 template<class T>
102 auto equals(const T& lhs, const T& rhs, int)
103  -> decltype(bool(lhs == rhs)) {
104  return lhs == rhs;
105 }
106 
107 template<class T>
108 bool equals(const T&, const T&, ...) {
109  throw std::runtime_error("value does not support equality tests");
110  return false;
111 }
112 
113 }
114 
115 template<typename T>
117 {
118  typedef typename detail::notification_base<T>::type type;
120 
121 private:
122  typedef detail::notification_base<T> base;
123 
124  struct on_next_notification : public base {
125  on_next_notification(T value) : value(std::move(value)) {
126  }
127  on_next_notification(const on_next_notification& o) : value(o.value) {}
128  on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {}
129  on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; }
130  virtual void out(std::ostream& os) const {
131  os << "on_next( ";
132  detail::to_stream(os, value, 0, 0);
133  os << ")";
134  }
135  virtual bool equals(const typename base::type& other) const {
136  bool result = false;
137  other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T v) {
138  result = detail::equals(this->value, v, 0);
139  })));
140  return result;
141  }
142  virtual void accept(const typename base::observer_type& o) const {
143  o.on_next(value);
144  }
145  const T value;
146  };
147 
148  struct on_error_notification : public base {
149  on_error_notification(std::exception_ptr ep) : ep(ep) {
150  }
151  on_error_notification(const on_error_notification& o) : ep(o.ep) {}
152  on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {}
153  on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; }
154  virtual void out(std::ostream& os) const {
155  os << "on_error(";
156  try {
157  std::rethrow_exception(ep);
158  } catch (const std::exception& e) {
159  os << e.what();
160  } catch (...) {
161  os << "<not derived from std::exception>";
162  }
163  os << ")";
164  }
165  virtual bool equals(const typename base::type& other) const {
166  bool result = false;
167  // not trying to compare exceptions
168  other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](std::exception_ptr){
169  result = true;
170  })));
171  return result;
172  }
173  virtual void accept(const typename base::observer_type& o) const {
174  o.on_error(ep);
175  }
176  const std::exception_ptr ep;
177  };
178 
179  struct on_completed_notification : public base {
180  on_completed_notification() {
181  }
182  virtual void out(std::ostream& os) const {
183  os << "on_completed()";
184  }
185  virtual bool equals(const typename base::type& other) const {
186  bool result = false;
187  other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
188  result = true;
189  })));
190  return result;
191  }
192  virtual void accept(const typename base::observer_type& o) const {
193  o.on_completed();
194  }
195  };
196 
197  struct exception_tag {};
198 
199  template<typename Exception>
200  static
201  type make_on_error(exception_tag&&, Exception&& e) {
202  std::exception_ptr ep;
203  try {
204  throw std::forward<Exception>(e);
205  }
206  catch (...) {
207  ep = std::current_exception();
208  }
209  return std::make_shared<on_error_notification>(ep);
210  }
211 
212  struct exception_ptr_tag {};
213 
214  static
215  type make_on_error(exception_ptr_tag&&, std::exception_ptr ep) {
216  return std::make_shared<on_error_notification>(ep);
217  }
218 
219 public:
220  template<typename U>
221  static type on_next(U value) {
222  return std::make_shared<on_next_notification>(std::move(value));
223  }
224 
225  static type on_completed() {
226  return std::make_shared<on_completed_notification>();
227  }
228 
229  template<typename Exception>
230  static type on_error(Exception&& e) {
231  return make_on_error(typename std::conditional<
232  std::is_same<rxu::decay_t<Exception>, std::exception_ptr>::value,
233  exception_ptr_tag, exception_tag>::type(),
234  std::forward<Exception>(e));
235  }
236 };
237 
238 template<class T>
239 bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) {
240  if (!lhs && !rhs) {return true;}
241  if (!lhs || !rhs) {return false;}
242  return lhs->equals(rhs);
243 }
244 
245 template<class T>
246 std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) {
247  n->out(os);
248  return os;
249 }
250 
251 
252 template<class T>
253 class recorded
254 {
255  long t;
256  T v;
257 public:
258  recorded(long t, T v)
259  : t(t), v(v) {
260  }
261  long time() const {
262  return t;
263  }
264  const T& value() const {
265  return v;
266  }
267 };
268 
269 template<class T>
271  return lhs.time() == rhs.time() && lhs.value() == rhs.value();
272 }
273 
274 template<class T>
275 std::ostream& operator<< (std::ostream& out, const recorded<T>& r) {
276  out << "@" << r.time() << "-" << r.value();
277  return out;
278 }
279 
280 }
281 namespace rxn=notifications;
282 
283 }
284 
285 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) {
286  return rxcpp::notifications::detail::ostreamvector(out, vs);
287 }
288 template<class T>
289 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) {
290  return rxcpp::notifications::detail::ostreamvector(out, vr);
291 }
292 
293 #endif
Definition: rx-notification.hpp:253
long time() const
Definition: rx-notification.hpp:261
Definition: rx-all.hpp:26
const T & value() const
Definition: rx-notification.hpp:264
bool operator==(subscription lhs, subscription rhs)
Definition: rx-notification.hpp:34
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static type on_completed()
Definition: rx-notification.hpp:225
Definition: rx-notification.hpp:116
subscription(long s)
Definition: rx-notification.hpp:20
void on_error(std::exception_ptr e) const
Definition: rx-subscriber.hpp:183
static type on_error(Exception &&e)
Definition: rx-notification.hpp:230
subscription(long s, long u)
Definition: rx-notification.hpp:23
static type on_next(U value)
Definition: rx-notification.hpp:221
void on_completed() const
Definition: rx-subscriber.hpp:190
Definition: rx-notification.hpp:14
detail::notification_base< T >::type type
Definition: rx-notification.hpp:118
detail::notification_base< T >::observer_type observer_type
Definition: rx-notification.hpp:119
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
std::ostream & operator<<(std::ostream &out, const subscription &s)
Definition: rx-notification.hpp:38
recorded(long t, T v)
Definition: rx-notification.hpp:258
long subscribe() const
Definition: rx-notification.hpp:26
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
long unsubscribe() const
Definition: rx-notification.hpp:29