5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP) 6 #define RXCPP_RX_SUBSCRIPTION_HPP 15 struct is_unsubscribe_function
19 static auto check(
int) -> decltype((*(CF*)
nullptr)());
21 static not_void check(...);
23 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)),
void>::value;
34 static typename C::subscription_tag* check(
int);
36 static void check(...);
38 static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)),
tag_subscription*>::value;
41 template<
class Unsubscribe>
45 unsubscribe_call_type unsubscribe_call;
51 : unsubscribe_call(o.unsubscribe_call)
55 : unsubscribe_call(std::move(o.unsubscribe_call))
59 : unsubscribe_call(std::move(s))
69 class base_subscription_state :
public std::enable_shared_from_this<base_subscription_state>
71 base_subscription_state();
74 explicit base_subscription_state(
bool initial)
75 : issubscribed(initial)
78 virtual ~base_subscription_state() {}
79 virtual void unsubscribe() {
81 std::atomic<bool> issubscribed;
88 struct subscription_state :
public base_subscription_state
91 subscription_state(inner_t i)
92 : base_subscription_state(
true)
96 virtual void unsubscribe() {
97 if (issubscribed.exchange(
false)) {
107 std::shared_ptr<base_subscription_state>
state;
123 : state(std::make_shared<base_subscription_state>(false))
131 : state(std::make_shared<subscription_state<U>>(std::move(u)))
140 : state(std::move((*static_cast<
subscription*>(&u)).state))
154 : state(std::move(o.state))
161 state = std::move(o.
state);
168 return state->issubscribed;
174 auto keepAlive = state;
175 state->unsubscribe();
193 return !(lhs == rhs);
203 ->
typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
207 template<
class Unsubscribe>
209 ->
typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
218 struct tag_composite_subscription_empty {};
220 class composite_subscription_inner
224 struct composite_subscription_state :
public std::enable_shared_from_this<composite_subscription_state>
226 std::set<subscription> subscriptions;
228 std::atomic<bool> issubscribed;
230 ~composite_subscription_state()
232 std::unique_lock<decltype(lock)> guard(lock);
233 subscriptions.clear();
236 composite_subscription_state()
240 composite_subscription_state(tag_composite_subscription_empty)
241 : issubscribed(
false)
249 std::unique_lock<decltype(lock)> guard(lock);
250 subscriptions.insert(s);
255 inline void remove(weak_subscription w) {
256 if (issubscribed && !w.expired()) {
258 std::unique_lock<decltype(lock)> guard(lock);
259 subscriptions.erase(std::move(s));
263 inline void clear() {
265 std::unique_lock<decltype(lock)> guard(lock);
267 std::set<subscription> v(std::move(subscriptions));
269 std::for_each(v.begin(), v.end(),
275 inline void unsubscribe() {
276 if (issubscribed.exchange(
false)) {
277 std::unique_lock<decltype(lock)> guard(lock);
279 std::set<subscription> v(std::move(subscriptions));
281 std::for_each(v.begin(), v.end(),
289 typedef std::shared_ptr<composite_subscription_state> shared_state_type;
292 mutable shared_state_type state;
295 composite_subscription_inner()
296 : state(std::make_shared<composite_subscription_state>())
299 composite_subscription_inner(tag_composite_subscription_empty et)
300 : state(std::make_shared<composite_subscription_state>(et))
304 composite_subscription_inner(
const composite_subscription_inner& o)
311 composite_subscription_inner(composite_subscription_inner&& o)
312 : state(std::move(o.state))
319 composite_subscription_inner& operator=(composite_subscription_inner o)
321 state = std::move(o.state);
332 return state->add(std::move(s));
334 inline void remove(weak_subscription w)
const {
338 state->remove(std::move(w));
340 inline void clear()
const {
346 inline void unsubscribe() {
350 state->unsubscribe();
365 :
protected detail::composite_subscription_inner
368 typedef detail::composite_subscription_inner inner_type;
382 ,
subscription(*static_cast<const inner_type* const>(this))
392 : inner_type(std::move(o))
399 inner_type::operator=(std::move(o));
405 return detail::shared_empty();
411 using inner_type::clear;
414 if (s == static_cast<const subscription&>(*
this)) {
421 auto w = inner_type::add(std::move(s));
428 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
432 inline void remove(weak_subscription w)
const {
435 inner_type::remove(w);
441 return static_cast<const subscription&
>(lhs) < static_cast<const subscription&>(rhs);
444 return static_cast<const subscription&
>(lhs) == static_cast<const subscription&>(rhs);
447 return !(lhs == rhs);
467 , value(std::make_shared<rxu::detail::maybe<T>>())
472 : lifetime(std::move(cs))
473 , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
475 auto localValue = value;
484 return value.get()->get();
491 return lifetime.is_subscribed();
494 return lifetime.add(std::move(s));
498 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
501 void remove(weak_subscription w)
const {
502 return lifetime.remove(std::move(w));
505 return lifetime.clear();
508 return lifetime.unsubscribe();
513 std::shared_ptr<rxu::detail::maybe<T>>
value;
composite_subscription lifetime
Definition: rx-subscription.hpp:512
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:181
subscription(subscription &&o)
Definition: rx-subscription.hpp:153
Definition: rx-all.hpp:26
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:493
static_subscription(const static_subscription &o)
Definition: rx-subscription.hpp:50
Definition: rx-subscription.hpp:460
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
bool operator<(const subscription &lhs, const subscription &rhs)
Definition: rx-subscription.hpp:186
Definition: rx-subscription.hpp:28
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
tag_subscription subscription_tag
Definition: rx-subscription.hpp:29
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
subscription()
Definition: rx-subscription.hpp:122
subscription & operator=(subscription o)
Definition: rx-subscription.hpp:160
subscription(U u, typename std::enable_if<!std::is_same< subscription, U >::value &&is_subscription< U >::value, void ** >::type=nullptr)
Definition: rx-subscription.hpp:138
resource()
Definition: rx-subscription.hpp:465
std::shared_ptr< base_subscription_state > state
Definition: rx-subscription.hpp:107
Definition: rx-subscription.hpp:31
composite_subscription & get_subscription()
Definition: rx-subscription.hpp:486
composite_subscription(composite_subscription &&o)
Definition: rx-subscription.hpp:391
subscription(const subscription &o)
Definition: rx-subscription.hpp:146
static composite_subscription empty()
Definition: rx-subscription.hpp:404
void unsubscribe() const
Definition: rx-subscription.hpp:62
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-subscription.hpp:42
composite_subscription & operator=(composite_subscription o)
Definition: rx-subscription.hpp:397
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:427
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:497
void unsubscribe() const
Definition: rx-subscription.hpp:507
void clear() const
Definition: rx-subscription.hpp:504
void unsubscribe() const
Definition: rx-subscription.hpp:170
static_subscription(static_subscription &&o)
Definition: rx-subscription.hpp:54
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
static_subscription(unsubscribe_call_type s)
Definition: rx-subscription.hpp:58
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
subscription(U u, typename std::enable_if<!is_subscription< U >::value, void ** >::type=nullptr)
Definition: rx-subscription.hpp:130
std::weak_ptr< base_subscription_state > weak_state_type
Definition: rx-subscription.hpp:84
composite_subscription(const composite_subscription &o)
Definition: rx-subscription.hpp:386
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscription.hpp:463
Definition: rx-subscription.hpp:29
weak_state_type get_weak()
Definition: rx-subscription.hpp:178
composite_subscription(detail::tag_composite_subscription_empty et)
Definition: rx-subscription.hpp:372
composite_subscription()
Definition: rx-subscription.hpp:380
Definition: rx-subscription.hpp:67
std::shared_ptr< rxu::detail::maybe< T > > value
Definition: rx-subscription.hpp:513
bool is_subscribed() const
Definition: rx-subscription.hpp:164
bool is_subscribed() const
Definition: rx-subscription.hpp:490
resource(T t, composite_subscription cs=composite_subscription())
Definition: rx-subscription.hpp:471