5 #if !defined(RXCPP_RX_SUBSCRIBER_HPP) 6 #define RXCPP_RX_SUBSCRIBER_HPP 24 template<
class T,
class Observer = observer<T>>
33 observer_type destination;
45 nextdetacher(
const this_type* that)
47 , do_unsubscribe(
true)
51 void operator()(U u) {
54 that->destination.on_next(std::move(u));
55 do_unsubscribe =
false;
57 auto ex = std::current_exception();
59 that->destination.on_error(std::move(ex));
63 const this_type* that;
64 volatile bool do_unsubscribe;
74 errordetacher(
const this_type* that)
78 inline void operator()(std::exception_ptr ex) {
80 that->destination.on_error(std::move(ex));
82 const this_type* that;
85 struct completeddetacher
92 completeddetacher(
const this_type* that)
96 inline void operator()() {
98 that->destination.on_completed();
100 const this_type* that;
108 : lifetime(o.lifetime)
109 , destination(o.destination)
114 : lifetime(std::move(o.lifetime))
115 , destination(std::move(o.destination))
116 , id(std::move(o.id))
120 template<
class U,
class O>
126 typename std::enable_if<
128 std::is_same<Observer,
observer<T>>::value,
void**>::type =
nullptr)
129 : lifetime(o.lifetime)
137 : lifetime(std::move(cs))
138 , destination(std::forward<U>(o))
147 lifetime = std::move(o.lifetime);
148 destination = std::move(o.destination);
149 id = std::move(o.id);
170 return subscriber<T>(id, lifetime, destination.as_dynamic());
177 if (!is_subscribed()) {
180 nextdetacher protect(
this);
181 protect(std::forward<V>(v));
184 if (!is_subscribed()) {
187 errordetacher protect(
this);
188 protect(std::move(e));
191 if (!is_subscribed()) {
194 completeddetacher protect(
this);
204 return lifetime.
add(std::move(s));
208 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
211 void remove(weak_subscription w)
const {
212 return lifetime.
remove(std::move(w));
215 return lifetime.clear();
223 template<
class T,
class Observer>
235 ->
typename std::enable_if<
236 detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
242 template<
class T,
class I>
248 template<
class T,
class Observer>
250 ->
typename std::enable_if<
256 template<
class T,
class Observer>
258 ->
typename std::enable_if<
259 !detail::is_on_next_of<T, Observer>::value &&
266 template<
class T,
class OnNext>
268 ->
typename std::enable_if<
269 detail::is_on_next_of<T, OnNext>::value,
274 template<
class T,
class OnNext,
class OnError>
276 ->
typename std::enable_if<
277 detail::is_on_next_of<T, OnNext>::value &&
278 detail::is_on_error<OnError>::value,
283 template<
class T,
class OnNext,
class OnCompleted>
285 ->
typename std::enable_if<
286 detail::is_on_next_of<T, OnNext>::value &&
287 detail::is_on_completed<OnCompleted>::value,
292 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
294 ->
typename std::enable_if<
295 detail::is_on_next_of<T, OnNext>::value &&
296 detail::is_on_error<OnError>::value &&
297 detail::is_on_completed<OnCompleted>::value,
313 template<
class T,
class I>
319 template<
class T,
class I>
325 template<
class T,
class Observer>
327 ->
typename std::enable_if<
333 template<
class T,
class Observer>
335 ->
typename std::enable_if<
336 !detail::is_on_next_of<T, Observer>::value &&
339 !is_observer<Observer>::value,
343 template<
class T,
class OnNext>
345 ->
typename std::enable_if<
346 detail::is_on_next_of<T, OnNext>::value,
351 template<
class T,
class OnNext,
class OnError>
353 ->
typename std::enable_if<
354 detail::is_on_next_of<T, OnNext>::value &&
355 detail::is_on_error<OnError>::value,
360 template<
class T,
class OnNext,
class OnCompleted>
362 ->
typename std::enable_if<
363 detail::is_on_next_of<T, OnNext>::value &&
364 detail::is_on_completed<OnCompleted>::value,
369 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
371 ->
typename std::enable_if<
372 detail::is_on_next_of<T, OnNext>::value &&
373 detail::is_on_error<OnError>::value &&
374 detail::is_on_completed<OnCompleted>::value,
397 template<
class T,
class I>
403 template<
class T,
class I>
409 template<
class T,
class Observer>
411 ->
typename std::enable_if<
412 is_observer<Observer>::value,
416 template<
class T,
class Observer>
418 ->
typename std::enable_if<
419 is_observer<Observer>::value,
423 template<
class T,
class Observer>
425 ->
typename std::enable_if<
426 !detail::is_on_next_of<T, Observer>::value &&
429 !is_observer<Observer>::value,
433 template<
class T,
class Observer>
435 ->
typename std::enable_if<
436 !detail::is_on_next_of<T, Observer>::value &&
439 !is_observer<Observer>::value,
443 template<
class T,
class OnNext>
445 ->
typename std::enable_if<
446 detail::is_on_next_of<T, OnNext>::value,
451 template<
class T,
class OnNext>
453 ->
typename std::enable_if<
454 detail::is_on_next_of<T, OnNext>::value,
459 template<
class T,
class OnNext,
class OnError>
461 ->
typename std::enable_if<
462 detail::is_on_next_of<T, OnNext>::value &&
463 detail::is_on_error<OnError>::value,
468 template<
class T,
class OnNext,
class OnError>
470 ->
typename std::enable_if<
471 detail::is_on_next_of<T, OnNext>::value &&
472 detail::is_on_error<OnError>::value,
477 template<
class T,
class OnNext,
class OnCompleted>
479 ->
typename std::enable_if<
480 detail::is_on_next_of<T, OnNext>::value &&
481 detail::is_on_completed<OnCompleted>::value,
486 template<
class T,
class OnNext,
class OnCompleted>
488 ->
typename std::enable_if<
489 detail::is_on_next_of<T, OnNext>::value &&
490 detail::is_on_completed<OnCompleted>::value,
495 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
497 ->
typename std::enable_if<
498 detail::is_on_next_of<T, OnNext>::value &&
499 detail::is_on_error<OnError>::value &&
500 detail::is_on_completed<OnCompleted>::value,
505 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
507 ->
typename std::enable_if<
508 detail::is_on_next_of<T, OnNext>::value &&
509 detail::is_on_error<OnError>::value &&
510 detail::is_on_completed<OnCompleted>::value,
519 template<
class T,
class OtherT,
class OtherObserver,
class I>
527 template<
class T,
class OtherT,
class OtherObserver,
class I>
535 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
537 ->
typename std::enable_if<
538 is_observer<Observer>::value,
544 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
546 ->
typename std::enable_if<
548 is_observer<Observer>::value,
554 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
556 ->
typename std::enable_if<
557 !detail::is_on_next_of<T, Observer>::value &&
560 !is_observer<Observer>::value,
566 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
568 ->
typename std::enable_if<
569 !detail::is_on_next_of<T, Observer>::value &&
572 !is_observer<Observer>::value,
578 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
580 ->
typename std::enable_if<
581 detail::is_on_next_of<T, OnNext>::value,
588 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
590 ->
typename std::enable_if<
591 detail::is_on_next_of<T, OnNext>::value,
598 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
600 ->
typename std::enable_if<
601 detail::is_on_next_of<T, OnNext>::value &&
602 detail::is_on_error<OnError>::value,
609 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
611 ->
typename std::enable_if<
612 detail::is_on_next_of<T, OnNext>::value &&
613 detail::is_on_error<OnError>::value,
620 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
622 ->
typename std::enable_if<
623 detail::is_on_next_of<T, OnNext>::value &&
624 detail::is_on_completed<OnCompleted>::value,
631 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
633 ->
typename std::enable_if<
634 detail::is_on_next_of<T, OnNext>::value &&
635 detail::is_on_completed<OnCompleted>::value,
642 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
644 ->
typename std::enable_if<
645 detail::is_on_next_of<T, OnNext>::value &&
646 detail::is_on_error<OnError>::value &&
647 detail::is_on_completed<OnCompleted>::value,
654 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
656 ->
typename std::enable_if<
657 detail::is_on_next_of<T, OnNext>::value &&
658 detail::is_on_error<OnError>::value &&
659 detail::is_on_completed<OnCompleted>::value,
667 template<
class T,
class OtherT,
class OtherObserver,
class I>
673 template<
class T,
class OtherT,
class OtherObserver,
class I>
679 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
681 ->
typename std::enable_if<
682 is_observer<Observer>::value,
688 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
690 ->
typename std::enable_if<
691 is_observer<Observer>::value,
697 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
699 ->
typename std::enable_if<
700 !detail::is_on_next_of<T, Observer>::value &&
703 !is_observer<Observer>::value,
709 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
711 ->
typename std::enable_if<
712 !detail::is_on_next_of<T, Observer>::value &&
715 !is_observer<Observer>::value,
721 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
723 ->
typename std::enable_if<
724 detail::is_on_next_of<T, OnNext>::value,
731 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
733 ->
typename std::enable_if<
734 detail::is_on_next_of<T, OnNext>::value,
741 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
743 ->
typename std::enable_if<
744 detail::is_on_next_of<T, OnNext>::value &&
745 detail::is_on_error<OnError>::value,
752 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
754 ->
typename std::enable_if<
755 detail::is_on_next_of<T, OnNext>::value &&
756 detail::is_on_error<OnError>::value,
763 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
765 ->
typename std::enable_if<
766 detail::is_on_next_of<T, OnNext>::value &&
767 detail::is_on_completed<OnCompleted>::value,
774 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
776 ->
typename std::enable_if<
777 detail::is_on_next_of<T, OnNext>::value &&
778 detail::is_on_completed<OnCompleted>::value,
785 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
787 ->
typename std::enable_if<
788 detail::is_on_next_of<T, OnNext>::value &&
789 detail::is_on_error<OnError>::value &&
790 detail::is_on_completed<OnCompleted>::value,
797 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
799 ->
typename std::enable_if<
800 detail::is_on_next_of<T, OnNext>::value &&
801 detail::is_on_error<OnError>::value &&
802 detail::is_on_completed<OnCompleted>::value,
810 template<
class T,
class Observer>
817 template<
class T,
class Observer>
825 template<
class T,
class Observer>
subscriber(trace_id id, composite_subscription cs, U &&o)
Definition: rx-subscriber.hpp:136
trace_id get_id() const
Definition: rx-subscriber.hpp:165
Definition: rx-all.hpp:26
bool is_subscribed() const
Definition: rx-subscriber.hpp:200
subscriber< T > as_dynamic() const
Definition: rx-subscriber.hpp:169
weak_subscription add(subscription s) const
Definition: rx-subscriber.hpp:203
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:432
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
const composite_subscription & get_subscription() const
Definition: rx-subscriber.hpp:159
Definition: rx-predef.hpp:113
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-subscription.hpp:31
Definition: rx-trace.hpp:14
subscriber(const subscriber< T, O > &o, typename std::enable_if< !std::is_same< O, observer< T >>::value &&std::is_same< Observer, observer< T >>::value, void ** >::type=nullptr)
Definition: rx-subscriber.hpp:124
void on_error(std::exception_ptr e) const
Definition: rx-subscriber.hpp:183
observer_type & get_observer()
Definition: rx-subscriber.hpp:156
Definition: rx-subscriber.hpp:13
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
tag_subscriber subscriber_tag
Definition: rx-subscriber.hpp:15
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscriber.hpp:105
this_type & operator=(this_type o)
Definition: rx-subscriber.hpp:146
void on_completed() const
Definition: rx-subscriber.hpp:190
void clear() const
Definition: rx-subscriber.hpp:214
void unsubscribe() const
Definition: rx-subscription.hpp:170
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
void unsubscribe() const
Definition: rx-subscriber.hpp:217
Definition: rx-observer.hpp:14
const observer_type & get_observer() const
Definition: rx-subscriber.hpp:153
Definition: rx-subscription.hpp:29
subscriber(this_type &&o)
Definition: rx-subscriber.hpp:113
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
static trace_id make_next_id_subscriber()
Definition: rx-trace.hpp:16
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-predef.hpp:90
Definition: rx-subscription.hpp:67
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscriber.hpp:207
Definition: rx-predef.hpp:115
bool is_subscribed() const
Definition: rx-subscription.hpp:164
subscriber(const this_type &o)
Definition: rx-subscriber.hpp:107
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
composite_subscription & get_subscription()
Definition: rx-subscriber.hpp:162