RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observer.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVER_HPP)
6 #define RXCPP_RX_OBSERVER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 
13 template<class T>
15 {
16  typedef T value_type;
18 };
19 
20 namespace detail {
21 template<class T>
22 struct OnNextEmpty
23 {
24  void operator()(const T&) const {}
25 };
26 struct OnErrorEmpty
27 {
28  void operator()(std::exception_ptr) const {
29  // error implicitly ignored, abort
30  std::terminate();
31  }
32 };
33 struct OnErrorIgnore
34 {
35  void operator()(std::exception_ptr) const {
36  }
37 };
38 struct OnCompletedEmpty
39 {
40  void operator()() const {}
41 };
42 
43 template<class T, class State, class OnNext>
44 struct OnNextForward
45 {
46  using state_t = rxu::decay_t<State>;
47  using onnext_t = rxu::decay_t<OnNext>;
48  OnNextForward() : onnext() {}
49  explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {}
50  onnext_t onnext;
51  void operator()(state_t& s, T& t) const {
52  onnext(s, t);
53  }
54  void operator()(state_t& s, T&& t) const {
55  onnext(s, t);
56  }
57 };
58 template<class T, class State>
59 struct OnNextForward<T, State, void>
60 {
61  using state_t = rxu::decay_t<State>;
62  OnNextForward() {}
63  void operator()(state_t& s, T& t) const {
64  s.on_next(t);
65  }
66  void operator()(state_t& s, T&& t) const {
67  s.on_next(t);
68  }
69 };
70 
71 template<class State, class OnError>
72 struct OnErrorForward
73 {
74  using state_t = rxu::decay_t<State>;
75  using onerror_t = rxu::decay_t<OnError>;
76  OnErrorForward() : onerror() {}
77  explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
78  onerror_t onerror;
79  void operator()(state_t& s, std::exception_ptr ep) const {
80  onerror(s, ep);
81  }
82 };
83 template<class State>
84 struct OnErrorForward<State, void>
85 {
86  using state_t = rxu::decay_t<State>;
87  OnErrorForward() {}
88  void operator()(state_t& s, std::exception_ptr ep) const {
89  s.on_error(ep);
90  }
91 };
92 
93 template<class State, class OnCompleted>
94 struct OnCompletedForward
95 {
96  using state_t = rxu::decay_t<State>;
97  using oncompleted_t = rxu::decay_t<OnCompleted>;
98  OnCompletedForward() : oncompleted() {}
99  explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {}
100  oncompleted_t oncompleted;
101  void operator()(state_t& s) const {
102  oncompleted(s);
103  }
104 };
105 template<class State>
106 struct OnCompletedForward<State, void>
107 {
108  OnCompletedForward() {}
109  void operator()(State& s) const {
110  s.on_completed();
111  }
112 };
113 
114 template<class T, class F>
115 struct is_on_next_of
116 {
117  struct not_void {};
118  template<class CT, class CF>
119  static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
120  template<class CT, class CF>
121  static not_void check(...);
122 
123  typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result;
124  static const bool value = std::is_same<detail_result, void>::value;
125 };
126 
127 template<class F>
128 struct is_on_error
129 {
130  struct not_void {};
131  template<class CF>
132  static auto check(int) -> decltype((*(CF*)nullptr)(*(std::exception_ptr*)nullptr));
133  template<class CF>
134  static not_void check(...);
135 
136  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
137 };
138 
139 template<class State, class F>
140 struct is_on_error_for
141 {
142  struct not_void {};
143  template<class CF>
144  static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(std::exception_ptr*)nullptr));
145  template<class CF>
146  static not_void check(...);
147 
148  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
149 };
150 
151 template<class F>
152 struct is_on_completed
153 {
154  struct not_void {};
155  template<class CF>
156  static auto check(int) -> decltype((*(CF*)nullptr)());
157  template<class CF>
158  static not_void check(...);
159 
160  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
161 };
162 
163 }
164 
165 
178 template<class T, class State, class OnNext, class OnError, class OnCompleted>
179 class observer : public observer_base<T>
180 {
181 public:
184  using on_next_t = typename std::conditional<
185  !std::is_same<void, OnNext>::value,
187  detail::OnNextForward<T, State, OnNext>>::type;
188  using on_error_t = typename std::conditional<
189  !std::is_same<void, OnError>::value,
191  detail::OnErrorForward<State, OnError>>::type;
192  using on_completed_t = typename std::conditional<
193  !std::is_same<void, OnCompleted>::value,
195  detail::OnCompletedForward<State, OnCompleted>>::type;
196 
197 private:
198  mutable state_t state;
199  on_next_t onnext;
200  on_error_t onerror;
201  on_completed_t oncompleted;
202 
203 public:
204 
206  : state(std::move(s))
207  , onnext(std::move(n))
208  , onerror(std::move(e))
209  , oncompleted(std::move(c))
210  {
211  }
213  : state(std::move(s))
214  , onnext(std::move(n))
215  , onerror(on_error_t())
216  , oncompleted(std::move(c))
217  {
218  }
219  observer(const this_type& o)
220  : state(o.state)
221  , onnext(o.onnext)
222  , onerror(o.onerror)
223  , oncompleted(o.oncompleted)
224  {
225  }
227  : state(std::move(o.state))
228  , onnext(std::move(o.onnext))
229  , onerror(std::move(o.onerror))
230  , oncompleted(std::move(o.oncompleted))
231  {
232  }
234  state = std::move(o.state);
235  onnext = std::move(o.onnext);
236  onerror = std::move(o.onerror);
237  oncompleted = std::move(o.oncompleted);
238  return *this;
239  }
240 
241  void on_next(T& t) const {
242  onnext(state, t);
243  }
244  void on_next(T&& t) const {
245  onnext(state, std::move(t));
246  }
247  void on_error(std::exception_ptr e) const {
248  onerror(state, e);
249  }
250  void on_completed() const {
251  oncompleted(state);
252  }
254  return observer<T>(*this);
255  }
256 };
257 
269 template<class T, class OnNext, class OnError, class OnCompleted>
270 class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T>
271 {
272 public:
274  using on_next_t = typename std::conditional<
275  !std::is_same<void, OnNext>::value,
276  rxu::decay_t<OnNext>,
277  detail::OnNextEmpty<T>>::type;
278  using on_error_t = typename std::conditional<
279  !std::is_same<void, OnError>::value,
280  rxu::decay_t<OnError>,
281  detail::OnErrorEmpty>::type;
282  using on_completed_t = typename std::conditional<
283  !std::is_same<void, OnCompleted>::value,
284  rxu::decay_t<OnCompleted>,
285  detail::OnCompletedEmpty>::type;
286 
287 private:
288  on_next_t onnext;
289  on_error_t onerror;
290  on_completed_t oncompleted;
291 
292 public:
293  static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);");
294  static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(std::exception_ptr);");
295  static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
296 
297  observer()
298  : onnext(on_next_t())
299  , onerror(on_error_t())
300  , oncompleted(on_completed_t())
301  {
302  }
303 
305  : onnext(std::move(n))
306  , onerror(std::move(e))
307  , oncompleted(std::move(c))
308  {
309  }
310  observer(const this_type& o)
311  : onnext(o.onnext)
312  , onerror(o.onerror)
313  , oncompleted(o.oncompleted)
314  {
315  }
316  observer(this_type&& o)
317  : onnext(std::move(o.onnext))
318  , onerror(std::move(o.onerror))
319  , oncompleted(std::move(o.oncompleted))
320  {
321  }
322  this_type& operator=(this_type o) {
323  onnext = std::move(o.onnext);
324  onerror = std::move(o.onerror);
325  oncompleted = std::move(o.oncompleted);
326  return *this;
327  }
328 
329  void on_next(T& t) const {
330  onnext(t);
331  }
332  void on_next(T&& t) const {
333  onnext(std::move(t));
334  }
335  void on_error(std::exception_ptr e) const {
336  onerror(e);
337  }
338  void on_completed() const {
339  oncompleted();
340  }
341  observer<T> as_dynamic() const {
342  return observer<T>(*this);
343  }
344 };
345 
346 namespace detail
347 {
348 
349 template<class T>
350 struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>>
351 {
352  virtual ~virtual_observer() {}
353  virtual void on_next(T&) const {};
354  virtual void on_next(T&&) const {};
355  virtual void on_error(std::exception_ptr) const {};
356  virtual void on_completed() const {};
357 };
358 
359 template<class T, class Observer>
360 struct specific_observer : public virtual_observer<T>
361 {
362  explicit specific_observer(Observer o)
363  : destination(std::move(o))
364  {
365  }
366 
367  Observer destination;
368  virtual void on_next(T& t) const {
369  destination.on_next(t);
370  }
371  virtual void on_next(T&& t) const {
372  destination.on_next(std::move(t));
373  }
374  virtual void on_error(std::exception_ptr e) const {
375  destination.on_error(e);
376  }
377  virtual void on_completed() const {
378  destination.on_completed();
379  }
380 };
381 
382 }
383 
392 template<class T>
393 class observer<T, void, void, void, void> : public observer_base<T>
394 {
395 public:
397 
398 private:
400  using base_type = observer_base<T>;
401  using virtual_observer = detail::virtual_observer<T>;
402 
403  std::shared_ptr<virtual_observer> destination;
404 
405  template<class Observer>
406  static auto make_destination(Observer o)
407  -> std::shared_ptr<virtual_observer> {
408  return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o));
409  }
410 
411 public:
413  {
414  }
415  observer(const this_type& o)
416  : destination(o.destination)
417  {
418  }
420  : destination(std::move(o.destination))
421  {
422  }
423 
424  template<class Observer>
425  explicit observer(Observer o)
426  : destination(make_destination(std::move(o)))
427  {
428  }
429 
431  destination = std::move(o.destination);
432  return *this;
433  }
434 
435  // perfect forwarding delays the copy of the value.
436  template<class V>
437  void on_next(V&& v) const {
438  if (destination) {
439  destination->on_next(std::forward<V>(v));
440  }
441  }
442  void on_error(std::exception_ptr e) const {
443  if (destination) {
444  destination->on_error(e);
445  }
446  }
447  void on_completed() const {
448  if (destination) {
449  destination->on_completed();
450  }
451  }
452 
454  return *this;
455  }
456 };
457 
458 template<class T, class DefaultOnError = detail::OnErrorEmpty>
462 }
463 
464 template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted>
468 }
469 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
470 auto make_observer(Observer ob)
471  -> typename std::enable_if<
472  !detail::is_on_next_of<T, Observer>::value &&
473  !detail::is_on_error<Observer>::value &&
475  Observer>::type {
476  return std::move(ob);
477 }
478 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
479 auto make_observer(Observer ob)
480  -> typename std::enable_if<
481  !detail::is_on_next_of<T, Observer>::value &&
482  !detail::is_on_error<Observer>::value &&
483  !is_observer<Observer>::value,
484  observer<T, Observer>>::type {
485  return observer<T, Observer>(std::move(ob));
486 }
487 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext>
488 auto make_observer(OnNext on)
489  -> typename std::enable_if<
490  detail::is_on_next_of<T, OnNext>::value,
493  std::move(on));
494 }
495 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError>
496 auto make_observer(OnError oe)
497  -> typename std::enable_if<
498  detail::is_on_error<OnError>::value,
500  return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>(
501  detail::OnNextEmpty<T>(), std::move(oe));
502 }
503 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError>
504 auto make_observer(OnNext on, OnError oe)
505  -> typename std::enable_if<
506  detail::is_on_next_of<T, OnNext>::value &&
507  detail::is_on_error<OnError>::value,
510  std::move(on), std::move(oe));
511 }
512 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted>
513 auto make_observer(OnNext on, OnCompleted oc)
514  -> typename std::enable_if<
515  detail::is_on_next_of<T, OnNext>::value &&
516  detail::is_on_completed<OnCompleted>::value,
519  std::move(on), DefaultOnError(), std::move(oc));
520 }
521 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted>
522 auto make_observer(OnNext on, OnError oe, OnCompleted oc)
523  -> typename std::enable_if<
524  detail::is_on_next_of<T, OnNext>::value &&
525  detail::is_on_error<OnError>::value &&
526  detail::is_on_completed<OnCompleted>::value,
529  std::move(on), std::move(oe), std::move(oc));
530 }
531 
532 
533 template<class T, class State, class OnNext>
534 auto make_observer(State os, OnNext on)
535  -> typename std::enable_if<
536  !detail::is_on_next_of<T, State>::value &&
537  !detail::is_on_error<State>::value,
540  std::move(os), std::move(on));
541 }
542 template<class T, class State, class OnError>
543 auto make_observer(State os, OnError oe)
544  -> typename std::enable_if<
545  !detail::is_on_next_of<T, State>::value &&
546  !detail::is_on_error<State>::value &&
547  detail::is_on_error_for<State, OnError>::value,
549  return observer<T, State, detail::OnNextEmpty<T>, OnError>(
550  std::move(os), detail::OnNextEmpty<T>(), std::move(oe));
551 }
552 template<class T, class State, class OnNext, class OnError>
553 auto make_observer(State os, OnNext on, OnError oe)
554  -> typename std::enable_if<
555  !detail::is_on_next_of<T, State>::value &&
556  !detail::is_on_error<State>::value &&
557  detail::is_on_error_for<State, OnError>::value,
560  std::move(os), std::move(on), std::move(oe));
561 }
562 template<class T, class State, class OnNext, class OnCompleted>
563 auto make_observer(State os, OnNext on, OnCompleted oc)
564  -> typename std::enable_if<
565  !detail::is_on_next_of<T, State>::value &&
566  !detail::is_on_error<State>::value,
569  std::move(os), std::move(on), std::move(oc));
570 }
571 template<class T, class State, class OnNext, class OnError, class OnCompleted>
572 auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc)
573  -> typename std::enable_if<
574  !detail::is_on_next_of<T, State>::value &&
575  !detail::is_on_error<State>::value &&
576  detail::is_on_error_for<State, OnError>::value,
579  std::move(os), std::move(on), std::move(oe), std::move(oc));
580 }
581 
582 template<class T, class Observer>
583 auto make_observer_dynamic(Observer o)
584  -> typename std::enable_if<
585  !detail::is_on_next_of<T, Observer>::value,
586  observer<T>>::type {
587  return observer<T>(std::move(o));
588 }
589 template<class T, class OnNext>
590 auto make_observer_dynamic(OnNext&& on)
591  -> typename std::enable_if<
592  detail::is_on_next_of<T, OnNext>::value,
593  observer<T>>::type {
594  return observer<T>(
595  make_observer<T>(std::forward<OnNext>(on)));
596 }
597 template<class T, class OnNext, class OnError>
598 auto make_observer_dynamic(OnNext&& on, OnError&& oe)
599  -> typename std::enable_if<
600  detail::is_on_next_of<T, OnNext>::value &&
601  detail::is_on_error<OnError>::value,
602  observer<T>>::type {
603  return observer<T>(
604  make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe)));
605 }
606 template<class T, class OnNext, class OnCompleted>
607 auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
608  -> typename std::enable_if<
609  detail::is_on_next_of<T, OnNext>::value &&
610  detail::is_on_completed<OnCompleted>::value,
611  observer<T>>::type {
612  return observer<T>(
613  make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc)));
614 }
615 template<class T, class OnNext, class OnError, class OnCompleted>
616 auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
617  -> typename std::enable_if<
618  detail::is_on_next_of<T, OnNext>::value &&
619  detail::is_on_error<OnError>::value &&
620  detail::is_on_completed<OnCompleted>::value,
621  observer<T>>::type {
622  return observer<T>(
623  make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)));
624 }
625 
626 namespace detail {
627 
628 template<class F>
629 struct maybe_from_result
630 {
631  typedef decltype((*(F*)nullptr)()) decl_result_type;
632  typedef rxu::decay_t<decl_result_type> result_type;
633  typedef rxu::maybe<result_type> type;
634 };
635 
636 }
637 
638 template<class F, class OnError>
639 auto on_exception(const F& f, const OnError& c)
640  -> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
641  typename detail::maybe_from_result<F>::type r;
642  try {
643  r.reset(f());
644  } catch (...) {
645  c(std::current_exception());
646  }
647  return r;
648 }
649 
650 template<class F, class Subscriber>
651 auto on_exception(const F& f, const Subscriber& s)
652  -> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
653  typename detail::maybe_from_result<F>::type r;
654  try {
655  r.reset(f());
656  } catch (...) {
657  s.on_error(std::current_exception());
658  }
659  return r;
660 }
661 
662 }
663 
664 #endif
observer()
Definition: rx-observer.hpp:412
Definition: rx-predef.hpp:88
Definition: rx-all.hpp:26
auto make_observer_dynamic(Observer o) -> typename std::enable_if< !detail::is_on_next_of< T, Observer >::value, observer< T >>::type
Definition: rx-observer.hpp:583
observer< T > as_dynamic() const
Definition: rx-observer.hpp:453
observer(state_t s, on_next_t n, on_completed_t c)
Definition: rx-observer.hpp:212
observer(Observer o)
Definition: rx-observer.hpp:425
void on_completed() const
Definition: rx-observer.hpp:447
observer(this_type &&o)
Definition: rx-observer.hpp:419
void on_completed() const
Definition: rx-observer.hpp:250
observer< T > as_dynamic() const
Definition: rx-observer.hpp:253
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-predef.hpp:100
this_type & operator=(this_type o)
Definition: rx-observer.hpp:430
void on_next(T &&t) const
Definition: rx-observer.hpp:244
observer(state_t s, on_next_t n=on_next_t(), on_error_t e=on_error_t(), on_completed_t c=on_completed_t())
Definition: rx-observer.hpp:205
this_type & operator=(this_type o)
Definition: rx-observer.hpp:233
observer(const this_type &o)
Definition: rx-observer.hpp:219
typename std::conditional< !std::is_same< void, OnCompleted >::value, rxu::decay_t< OnCompleted >, detail::OnCompletedForward< State, OnCompleted >>::type on_completed_t
Definition: rx-observer.hpp:195
tag_dynamic_observer dynamic_observer_tag
Definition: rx-observer.hpp:396
void on_next(T &t) const
Definition: rx-observer.hpp:241
tag_observer observer_tag
Definition: rx-observer.hpp:17
void on_error(std::exception_ptr e) const
Definition: rx-observer.hpp:442
auto make_observer() -> observer< T, detail::stateless_observer_tag, detail::OnNextEmpty< T >, DefaultOnError >
Definition: rx-observer.hpp:459
void on_next(V &&v) const
Definition: rx-observer.hpp:437
Definition: rx-observer.hpp:14
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
void on_error(std::exception_ptr e) const
Definition: rx-observer.hpp:247
observer(this_type &&o)
Definition: rx-observer.hpp:226
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
T value_type
Definition: rx-observer.hpp:16
consumes values from an observable using type-forgetting (shared allocated state with virtual methods...
Definition: rx-observer.hpp:393
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
Definition: rx-predef.hpp:90
typename std::conditional< !std::is_same< void, OnNext >::value, rxu::decay_t< OnNext >, detail::OnNextForward< T, State, OnNext >>::type on_next_t
Definition: rx-observer.hpp:187
observer(const this_type &o)
Definition: rx-observer.hpp:415
rxu::decay_t< State > state_t
Definition: rx-observer.hpp:183
typename std::conditional< !std::is_same< void, OnError >::value, rxu::decay_t< OnError >, detail::OnErrorForward< State, OnError >>::type on_error_t
Definition: rx-observer.hpp:191