RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-subscriber.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SUBSCRIBER_HPP)
6 #define RXCPP_RX_SUBSCRIBER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 template<class T>
14 {
16 };
17 
24 template<class T, class Observer = observer<T>>
25 class subscriber : public subscriber_base<T>
26 {
27  static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers");
28  static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>");
30  typedef rxu::decay_t<Observer> observer_type;
31 
32  composite_subscription lifetime;
33  observer_type destination;
34  trace_id id;
35 
36  struct nextdetacher
37  {
38  ~nextdetacher()
39  {
40  trace_activity().on_next_return(*that);
41  if (do_unsubscribe) {
42  that->unsubscribe();
43  }
44  }
45  nextdetacher(const this_type* that)
46  : that(that)
47  , do_unsubscribe(true)
48  {
49  }
50  template<class U>
51  void operator()(U u) {
52  trace_activity().on_next_enter(*that, u);
53  try {
54  that->destination.on_next(std::move(u));
55  do_unsubscribe = false;
56  } catch(...) {
57  auto ex = std::current_exception();
58  trace_activity().on_error_enter(*that, ex);
59  that->destination.on_error(std::move(ex));
60  trace_activity().on_error_return(*that);
61  }
62  }
63  const this_type* that;
64  volatile bool do_unsubscribe;
65  };
66 
67  struct errordetacher
68  {
69  ~errordetacher()
70  {
71  trace_activity().on_error_return(*that);
72  that->unsubscribe();
73  }
74  errordetacher(const this_type* that)
75  : that(that)
76  {
77  }
78  inline void operator()(std::exception_ptr ex) {
79  trace_activity().on_error_enter(*that, ex);
80  that->destination.on_error(std::move(ex));
81  }
82  const this_type* that;
83  };
84 
85  struct completeddetacher
86  {
87  ~completeddetacher()
88  {
89  trace_activity().on_completed_return(*that);
90  that->unsubscribe();
91  }
92  completeddetacher(const this_type* that)
93  : that(that)
94  {
95  }
96  inline void operator()() {
97  trace_activity().on_completed_enter(*that);
98  that->destination.on_completed();
99  }
100  const this_type* that;
101  };
102 
103  subscriber();
104 public:
106 
107  subscriber(const this_type& o)
108  : lifetime(o.lifetime)
109  , destination(o.destination)
110  , id(o.id)
111  {
112  }
113  subscriber(this_type&& o)
114  : lifetime(std::move(o.lifetime))
115  , destination(std::move(o.destination))
116  , id(std::move(o.id))
117  {
118  }
119 
120  template<class U, class O>
121  friend class subscriber;
122 
123  template<class O>
125  const subscriber<T, O>& o,
126  typename std::enable_if<
127  !std::is_same<O, observer<T>>::value &&
128  std::is_same<Observer, observer<T>>::value, void**>::type = nullptr)
129  : lifetime(o.lifetime)
130  , destination(o.destination.as_dynamic())
131  , id(o.id)
132  {
133  }
134 
135  template<class U>
137  : lifetime(std::move(cs))
138  , destination(std::forward<U>(o))
139  , id(std::move(id))
140  {
141  static_assert(!is_subscriber<U>::value, "cannot nest subscribers");
142  static_assert(is_observer<U>::value, "must pass observer to subscriber");
143  trace_activity().create_subscriber(*this);
144  }
145 
146  this_type& operator=(this_type o) {
147  lifetime = std::move(o.lifetime);
148  destination = std::move(o.destination);
149  id = std::move(o.id);
150  return *this;
151  }
152 
153  const observer_type& get_observer() const {
154  return destination;
155  }
156  observer_type& get_observer() {
157  return destination;
158  }
160  return lifetime;
161  }
163  return lifetime;
164  }
165  trace_id get_id() const {
166  return id;
167  }
168 
170  return subscriber<T>(id, lifetime, destination.as_dynamic());
171  }
172 
173  // observer
174  //
175  template<class V>
176  void on_next(V&& v) const {
177  if (!is_subscribed()) {
178  return;
179  }
180  nextdetacher protect(this);
181  protect(std::forward<V>(v));
182  }
183  void on_error(std::exception_ptr e) const {
184  if (!is_subscribed()) {
185  return;
186  }
187  errordetacher protect(this);
188  protect(std::move(e));
189  }
190  void on_completed() const {
191  if (!is_subscribed()) {
192  return;
193  }
194  completeddetacher protect(this);
195  protect();
196  }
197 
198  // composite_subscription
199  //
200  bool is_subscribed() const {
201  return lifetime.is_subscribed();
202  }
203  weak_subscription add(subscription s) const {
204  return lifetime.add(std::move(s));
205  }
206  template<class F>
207  auto add(F f) const
208  -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
209  return lifetime.add(make_subscription(std::move(f)));
210  }
211  void remove(weak_subscription w) const {
212  return lifetime.remove(std::move(w));
213  }
214  void clear() const {
215  return lifetime.clear();
216  }
217  void unsubscribe() const {
218  return lifetime.unsubscribe();
219  }
220 
221 };
222 
223 template<class T, class Observer>
227  return subscriber<T, Observer>(std::move(o));
228 }
229 
230 // observer
231 //
232 
233 template<class T>
235  -> typename std::enable_if<
236  detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
240 }
241 
242 template<class T, class I>
244  const observer<T, I>& o)
247 }
248 template<class T, class Observer>
249 auto make_subscriber(const Observer& o)
250  -> typename std::enable_if<
253  subscriber<T, Observer>>::type {
255 }
256 template<class T, class Observer>
257 auto make_subscriber(const Observer& o)
258  -> typename std::enable_if<
259  !detail::is_on_next_of<T, Observer>::value &&
265 }
266 template<class T, class OnNext>
267 auto make_subscriber(const OnNext& on)
268  -> typename std::enable_if<
269  detail::is_on_next_of<T, OnNext>::value,
273 }
274 template<class T, class OnNext, class OnError>
275 auto make_subscriber(const OnNext& on, const OnError& oe)
276  -> typename std::enable_if<
277  detail::is_on_next_of<T, OnNext>::value &&
278  detail::is_on_error<OnError>::value,
282 }
283 template<class T, class OnNext, class OnCompleted>
284 auto make_subscriber(const OnNext& on, const OnCompleted& oc)
285  -> typename std::enable_if<
286  detail::is_on_next_of<T, OnNext>::value &&
287  detail::is_on_completed<OnCompleted>::value,
291 }
292 template<class T, class OnNext, class OnError, class OnCompleted>
293 auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
294  -> typename std::enable_if<
295  detail::is_on_next_of<T, OnNext>::value &&
296  detail::is_on_error<OnError>::value &&
297  detail::is_on_completed<OnCompleted>::value,
301 }
302 
303 // explicit lifetime
304 //
305 
306 template<class T>
311 }
312 
313 template<class T, class I>
315  const observer<T, I>& o)
318 }
319 template<class T, class I>
321  const subscriber<T, I>& s)
322  -> subscriber<T, I> {
323  return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer());
324 }
325 template<class T, class Observer>
326 auto make_subscriber(const composite_subscription& cs, const Observer& o)
327  -> typename std::enable_if<
330  subscriber<T, Observer>>::type {
332 }
333 template<class T, class Observer>
334 auto make_subscriber(const composite_subscription& cs, const Observer& o)
335  -> typename std::enable_if<
336  !detail::is_on_next_of<T, Observer>::value &&
339  !is_observer<Observer>::value,
342 }
343 template<class T, class OnNext>
344 auto make_subscriber(const composite_subscription& cs, const OnNext& on)
345  -> typename std::enable_if<
346  detail::is_on_next_of<T, OnNext>::value,
350 }
351 template<class T, class OnNext, class OnError>
352 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe)
353  -> typename std::enable_if<
354  detail::is_on_next_of<T, OnNext>::value &&
355  detail::is_on_error<OnError>::value,
359 }
360 template<class T, class OnNext, class OnCompleted>
361 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
362  -> typename std::enable_if<
363  detail::is_on_next_of<T, OnNext>::value &&
364  detail::is_on_completed<OnCompleted>::value,
368 }
369 template<class T, class OnNext, class OnError, class OnCompleted>
370 auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
371  -> typename std::enable_if<
372  detail::is_on_next_of<T, OnNext>::value &&
373  detail::is_on_error<OnError>::value &&
374  detail::is_on_completed<OnCompleted>::value,
378 }
379 
380 // explicit id
381 //
382 
383 template<class T>
388 }
389 
390 template<class T>
395 }
396 
397 template<class T, class I>
399  const observer<T, I>& o)
401  return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o);
402 }
403 template<class T, class I>
405  const observer<T, I>& o)
407  return subscriber<T, observer<T, I>>(std::move(id), cs, o);
408 }
409 template<class T, class Observer>
410 auto make_subscriber(trace_id id, const Observer& o)
411  -> typename std::enable_if<
412  is_observer<Observer>::value,
413  subscriber<T, Observer>>::type {
414  return subscriber<T, Observer>(std::move(id), composite_subscription(), o);
415 }
416 template<class T, class Observer>
417 auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
418  -> typename std::enable_if<
419  is_observer<Observer>::value,
420  subscriber<T, Observer>>::type {
421  return subscriber<T, Observer>(std::move(id), cs, o);
422 }
423 template<class T, class Observer>
424 auto make_subscriber(trace_id id, const Observer& o)
425  -> typename std::enable_if<
426  !detail::is_on_next_of<T, Observer>::value &&
429  !is_observer<Observer>::value,
431  return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o);
432 }
433 template<class T, class Observer>
434 auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
435  -> typename std::enable_if<
436  !detail::is_on_next_of<T, Observer>::value &&
439  !is_observer<Observer>::value,
441  return subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
442 }
443 template<class T, class OnNext>
444 auto make_subscriber(trace_id id, const OnNext& on)
445  -> typename std::enable_if<
446  detail::is_on_next_of<T, OnNext>::value,
450 }
451 template<class T, class OnNext>
452 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on)
453  -> typename std::enable_if<
454  detail::is_on_next_of<T, OnNext>::value,
458 }
459 template<class T, class OnNext, class OnError>
460 auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe)
461  -> typename std::enable_if<
462  detail::is_on_next_of<T, OnNext>::value &&
463  detail::is_on_error<OnError>::value,
467 }
468 template<class T, class OnNext, class OnError>
469 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
470  -> typename std::enable_if<
471  detail::is_on_next_of<T, OnNext>::value &&
472  detail::is_on_error<OnError>::value,
476 }
477 template<class T, class OnNext, class OnCompleted>
478 auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc)
479  -> typename std::enable_if<
480  detail::is_on_next_of<T, OnNext>::value &&
481  detail::is_on_completed<OnCompleted>::value,
485 }
486 template<class T, class OnNext, class OnCompleted>
487 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
488  -> typename std::enable_if<
489  detail::is_on_next_of<T, OnNext>::value &&
490  detail::is_on_completed<OnCompleted>::value,
494 }
495 template<class T, class OnNext, class OnError, class OnCompleted>
496 auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
497  -> typename std::enable_if<
498  detail::is_on_next_of<T, OnNext>::value &&
499  detail::is_on_error<OnError>::value &&
500  detail::is_on_completed<OnCompleted>::value,
504 }
505 template<class T, class OnNext, class OnError, class OnCompleted>
506 auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
507  -> typename std::enable_if<
508  detail::is_on_next_of<T, OnNext>::value &&
509  detail::is_on_error<OnError>::value &&
510  detail::is_on_completed<OnCompleted>::value,
514 }
515 
516 // chain defaults from subscriber
517 //
518 
519 template<class T, class OtherT, class OtherObserver, class I>
521  const observer<T, I>& o)
523  auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
524  trace_activity().connect(r, scbr);
525  return r;
526 }
527 template<class T, class OtherT, class OtherObserver, class I>
529  const observer<T, I>& o)
531  auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o);
532  trace_activity().connect(r, scbr);
533  return r;
534 }
535 template<class T, class OtherT, class OtherObserver, class Observer>
536 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
537  -> typename std::enable_if<
538  is_observer<Observer>::value,
539  subscriber<T, Observer>>::type {
540  auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o);
541  trace_activity().connect(r, scbr);
542  return r;
543 }
544 template<class T, class OtherT, class OtherObserver, class Observer>
545 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
546  -> typename std::enable_if<
548  is_observer<Observer>::value,
549  subscriber<T, Observer>>::type {
550  auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
551  trace_activity().connect(r, scbr);
552  return r;
553 }
554 template<class T, class OtherT, class OtherObserver, class Observer>
555 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
556  -> typename std::enable_if<
557  !detail::is_on_next_of<T, Observer>::value &&
560  !is_observer<Observer>::value,
563  trace_activity().connect(r, scbr);
564  return r;
565 }
566 template<class T, class OtherT, class OtherObserver, class Observer>
567 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
568  -> typename std::enable_if<
569  !detail::is_on_next_of<T, Observer>::value &&
572  !is_observer<Observer>::value,
574  auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o);
575  trace_activity().connect(r, scbr);
576  return r;
577 }
578 template<class T, class OtherT, class OtherObserver, class OnNext>
579 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on)
580  -> typename std::enable_if<
581  detail::is_on_next_of<T, OnNext>::value,
585  trace_activity().connect(r, scbr);
586  return r;
587 }
588 template<class T, class OtherT, class OtherObserver, class OnNext>
589 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on)
590  -> typename std::enable_if<
591  detail::is_on_next_of<T, OnNext>::value,
595  trace_activity().connect(r, scbr);
596  return r;
597 }
598 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
599 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe)
600  -> typename std::enable_if<
601  detail::is_on_next_of<T, OnNext>::value &&
602  detail::is_on_error<OnError>::value,
606  trace_activity().connect(r, scbr);
607  return r;
608 }
609 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
610 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe)
611  -> typename std::enable_if<
612  detail::is_on_next_of<T, OnNext>::value &&
613  detail::is_on_error<OnError>::value,
617  trace_activity().connect(r, scbr);
618  return r;
619 }
620 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
621 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc)
622  -> typename std::enable_if<
623  detail::is_on_next_of<T, OnNext>::value &&
624  detail::is_on_completed<OnCompleted>::value,
628  trace_activity().connect(r, scbr);
629  return r;
630 }
631 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
632 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc)
633  -> typename std::enable_if<
634  detail::is_on_next_of<T, OnNext>::value &&
635  detail::is_on_completed<OnCompleted>::value,
639  trace_activity().connect(r, scbr);
640  return r;
641 }
642 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
643 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc)
644  -> typename std::enable_if<
645  detail::is_on_next_of<T, OnNext>::value &&
646  detail::is_on_error<OnError>::value &&
647  detail::is_on_completed<OnCompleted>::value,
651  trace_activity().connect(r, scbr);
652  return r;
653 }
654 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
655 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
656  -> typename std::enable_if<
657  detail::is_on_next_of<T, OnNext>::value &&
658  detail::is_on_error<OnError>::value &&
659  detail::is_on_completed<OnCompleted>::value,
663  trace_activity().connect(r, scbr);
664  return r;
665 }
666 
667 template<class T, class OtherT, class OtherObserver, class I>
669  const observer<T, I>& o)
672 }
673 template<class T, class OtherT, class OtherObserver, class I>
675  const observer<T, I>& o)
677  return subscriber<T, observer<T, I>>(std::move(id), cs, o);
678 }
679 template<class T, class OtherT, class OtherObserver, class Observer>
680 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
681  -> typename std::enable_if<
682  is_observer<Observer>::value,
683  subscriber<T, Observer>>::type {
685  trace_activity().connect(r, scbr);
686  return r;
687 }
688 template<class T, class OtherT, class OtherObserver, class Observer>
689 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
690  -> typename std::enable_if<
691  is_observer<Observer>::value,
692  subscriber<T, Observer>>::type {
693  auto r = subscriber<T, Observer>(std::move(id), cs, o);
694  trace_activity().connect(r, scbr);
695  return r;
696 }
697 template<class T, class OtherT, class OtherObserver, class Observer>
698 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
699  -> typename std::enable_if<
700  !detail::is_on_next_of<T, Observer>::value &&
703  !is_observer<Observer>::value,
706  trace_activity().connect(r, scbr);
707  return r;
708 }
709 template<class T, class OtherT, class OtherObserver, class Observer>
710 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
711  -> typename std::enable_if<
712  !detail::is_on_next_of<T, Observer>::value &&
715  !is_observer<Observer>::value,
717  auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
718  trace_activity().connect(r, scbr);
719  return r;
720 }
721 template<class T, class OtherT, class OtherObserver, class OnNext>
722 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on)
723  -> typename std::enable_if<
724  detail::is_on_next_of<T, OnNext>::value,
728  trace_activity().connect(r, scbr);
729  return r;
730 }
731 template<class T, class OtherT, class OtherObserver, class OnNext>
732 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on)
733  -> typename std::enable_if<
734  detail::is_on_next_of<T, OnNext>::value,
738  trace_activity().connect(r, scbr);
739  return r;
740 }
741 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
742 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe)
743  -> typename std::enable_if<
744  detail::is_on_next_of<T, OnNext>::value &&
745  detail::is_on_error<OnError>::value,
749  trace_activity().connect(r, scbr);
750  return r;
751 }
752 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
753 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
754  -> typename std::enable_if<
755  detail::is_on_next_of<T, OnNext>::value &&
756  detail::is_on_error<OnError>::value,
760  trace_activity().connect(r, scbr);
761  return r;
762 }
763 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
764 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
765  -> typename std::enable_if<
766  detail::is_on_next_of<T, OnNext>::value &&
767  detail::is_on_completed<OnCompleted>::value,
771  trace_activity().connect(r, scbr);
772  return r;
773 }
774 template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
775 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
776  -> typename std::enable_if<
777  detail::is_on_next_of<T, OnNext>::value &&
778  detail::is_on_completed<OnCompleted>::value,
782  trace_activity().connect(r, scbr);
783  return r;
784 }
785 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
786 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
787  -> typename std::enable_if<
788  detail::is_on_next_of<T, OnNext>::value &&
789  detail::is_on_error<OnError>::value &&
790  detail::is_on_completed<OnCompleted>::value,
794  trace_activity().connect(r, scbr);
795  return r;
796 }
797 template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
798 auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
799  -> typename std::enable_if<
800  detail::is_on_next_of<T, OnNext>::value &&
801  detail::is_on_error<OnError>::value &&
802  detail::is_on_completed<OnCompleted>::value,
806  trace_activity().connect(r, scbr);
807  return r;
808 }
809 
810 template<class T, class Observer>
813  auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer());
814  trace_activity().connect(r, scbr);
815  return r;
816 }
817 template<class T, class Observer>
820  auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer());
821  trace_activity().connect(r, scbr);
822  return r;
823 }
824 
825 template<class T, class Observer>
828  auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer());
829  trace_activity().connect(r, scbr);
830  return r;
831 }
832 
833 }
834 
835 #endif
subscriber(trace_id id, composite_subscription cs, U &&o)
Definition: rx-subscriber.hpp:136
trace_id get_id() const
Definition: rx-subscriber.hpp:165
Definition: rx-all.hpp:26
bool is_subscribed() const
Definition: rx-subscriber.hpp:200
subscriber< T > as_dynamic() const
Definition: rx-subscriber.hpp:169
weak_subscription add(subscription s) const
Definition: rx-subscriber.hpp:203
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:432
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
const composite_subscription & get_subscription() const
Definition: rx-subscriber.hpp:159
Definition: rx-predef.hpp:113
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-subscription.hpp:31
Definition: rx-trace.hpp:14
subscriber(const subscriber< T, O > &o, typename std::enable_if< !std::is_same< O, observer< T >>::value &&std::is_same< Observer, observer< T >>::value, void ** >::type=nullptr)
Definition: rx-subscriber.hpp:124
void on_error(std::exception_ptr e) const
Definition: rx-subscriber.hpp:183
observer_type & get_observer()
Definition: rx-subscriber.hpp:156
Definition: rx-subscriber.hpp:13
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
tag_subscriber subscriber_tag
Definition: rx-subscriber.hpp:15
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscriber.hpp:105
this_type & operator=(this_type o)
Definition: rx-subscriber.hpp:146
void on_completed() const
Definition: rx-subscriber.hpp:190
void clear() const
Definition: rx-subscriber.hpp:214
void unsubscribe() const
Definition: rx-subscription.hpp:170
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
void unsubscribe() const
Definition: rx-subscriber.hpp:217
Definition: rx-observer.hpp:14
const observer_type & get_observer() const
Definition: rx-subscriber.hpp:153
Definition: rx-subscription.hpp:29
subscriber(this_type &&o)
Definition: rx-subscriber.hpp:113
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
static trace_id make_next_id_subscriber()
Definition: rx-trace.hpp:16
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-predef.hpp:90
Definition: rx-subscription.hpp:67
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscriber.hpp:207
Definition: rx-predef.hpp:115
bool is_subscribed() const
Definition: rx-subscription.hpp:164
subscriber(const this_type &o)
Definition: rx-subscriber.hpp:107
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
composite_subscription & get_subscription()
Definition: rx-subscriber.hpp:162