RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-subscription.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP)
6 #define RXCPP_RX_SUBSCRIPTION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class F>
15 struct is_unsubscribe_function
16 {
17  struct not_void {};
18  template<class CF>
19  static auto check(int) -> decltype((*(CF*)nullptr)());
20  template<class CF>
21  static not_void check(...);
22 
23  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
24 };
25 
26 }
27 
28 struct tag_subscription {};
30 template<class T>
32 {
33  template<class C>
34  static typename C::subscription_tag* check(int);
35  template<class C>
36  static void check(...);
37 public:
38  static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value;
39 };
40 
41 template<class Unsubscribe>
43 {
44  typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type;
45  unsubscribe_call_type unsubscribe_call;
47  {
48  }
49 public:
51  : unsubscribe_call(o.unsubscribe_call)
52  {
53  }
55  : unsubscribe_call(std::move(o.unsubscribe_call))
56  {
57  }
58  static_subscription(unsubscribe_call_type s)
59  : unsubscribe_call(std::move(s))
60  {
61  }
62  void unsubscribe() const {
63  unsubscribe_call();
64  }
65 };
66 
68 {
69  class base_subscription_state : public std::enable_shared_from_this<base_subscription_state>
70  {
71  base_subscription_state();
72  public:
73 
74  explicit base_subscription_state(bool initial)
75  : issubscribed(initial)
76  {
77  }
78  virtual ~base_subscription_state() {}
79  virtual void unsubscribe() {
80  }
81  std::atomic<bool> issubscribed;
82  };
83 public:
84  typedef std::weak_ptr<base_subscription_state> weak_state_type;
85 
86 private:
87  template<class I>
88  struct subscription_state : public base_subscription_state
89  {
90  typedef rxu::decay_t<I> inner_t;
91  subscription_state(inner_t i)
92  : base_subscription_state(true)
93  , inner(std::move(i))
94  {
95  }
96  virtual void unsubscribe() {
97  if (issubscribed.exchange(false)) {
98  trace_activity().unsubscribe_enter(*this);
99  inner.unsubscribe();
100  trace_activity().unsubscribe_return(*this);
101  }
102  }
103  inner_t inner;
104  };
105 
106 protected:
107  std::shared_ptr<base_subscription_state> state;
108 
109  friend bool operator<(const subscription&, const subscription&);
110  friend bool operator==(const subscription&, const subscription&);
111 
112 private:
113  subscription(weak_state_type w)
114  : state(w.lock())
115  {
116  if (!state) {
117  std::terminate();
118  }
119  }
120 public:
121 
123  : state(std::make_shared<base_subscription_state>(false))
124  {
125  if (!state) {
126  std::terminate();
127  }
128  }
129  template<class U>
130  explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr)
131  : state(std::make_shared<subscription_state<U>>(std::move(u)))
132  {
133  if (!state) {
134  std::terminate();
135  }
136  }
137  template<class U>
138  explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr)
139  // intentionally slice
140  : state(std::move((*static_cast<subscription*>(&u)).state))
141  {
142  if (!state) {
143  std::terminate();
144  }
145  }
147  : state(o.state)
148  {
149  if (!state) {
150  std::terminate();
151  }
152  }
154  : state(std::move(o.state))
155  {
156  if (!state) {
157  std::terminate();
158  }
159  }
161  state = std::move(o.state);
162  return *this;
163  }
164  bool is_subscribed() const {
165  if (!state) {
166  std::terminate();
167  }
168  return state->issubscribed;
169  }
170  void unsubscribe() const {
171  if (!state) {
172  std::terminate();
173  }
174  auto keepAlive = state;
175  state->unsubscribe();
176  }
177 
178  weak_state_type get_weak() {
179  return state;
180  }
181  static subscription lock(weak_state_type w) {
182  return subscription(w);
183  }
184 };
185 
186 inline bool operator<(const subscription& lhs, const subscription& rhs) {
187  return lhs.state < rhs.state;
188 }
189 inline bool operator==(const subscription& lhs, const subscription& rhs) {
190  return lhs.state == rhs.state;
191 }
192 inline bool operator!=(const subscription& lhs, const subscription& rhs) {
193  return !(lhs == rhs);
194 }
195 
196 
197 inline auto make_subscription()
198  -> subscription {
199  return subscription();
200 }
201 template<class I>
202 auto make_subscription(I&& i)
203  -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
204  subscription>::type {
205  return subscription(std::forward<I>(i));
206 }
207 template<class Unsubscribe>
208 auto make_subscription(Unsubscribe&& u)
209  -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
210  subscription>::type {
211  return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
212 }
213 
215 
216 namespace detail {
217 
218 struct tag_composite_subscription_empty {};
219 
220 class composite_subscription_inner
221 {
222 private:
223  typedef subscription::weak_state_type weak_subscription;
224  struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
225  {
226  std::set<subscription> subscriptions;
227  std::mutex lock;
228  std::atomic<bool> issubscribed;
229 
230  ~composite_subscription_state()
231  {
232  std::unique_lock<decltype(lock)> guard(lock);
233  subscriptions.clear();
234  }
235 
236  composite_subscription_state()
237  : issubscribed(true)
238  {
239  }
240  composite_subscription_state(tag_composite_subscription_empty)
241  : issubscribed(false)
242  {
243  }
244 
245  inline weak_subscription add(subscription s) {
246  if (!issubscribed) {
247  s.unsubscribe();
248  } else if (s.is_subscribed()) {
249  std::unique_lock<decltype(lock)> guard(lock);
250  subscriptions.insert(s);
251  }
252  return s.get_weak();
253  }
254 
255  inline void remove(weak_subscription w) {
256  if (issubscribed && !w.expired()) {
257  auto s = subscription::lock(w);
258  std::unique_lock<decltype(lock)> guard(lock);
259  subscriptions.erase(std::move(s));
260  }
261  }
262 
263  inline void clear() {
264  if (issubscribed) {
265  std::unique_lock<decltype(lock)> guard(lock);
266 
267  std::set<subscription> v(std::move(subscriptions));
268  guard.unlock();
269  std::for_each(v.begin(), v.end(),
270  [](const subscription& s) {
271  s.unsubscribe(); });
272  }
273  }
274 
275  inline void unsubscribe() {
276  if (issubscribed.exchange(false)) {
277  std::unique_lock<decltype(lock)> guard(lock);
278 
279  std::set<subscription> v(std::move(subscriptions));
280  guard.unlock();
281  std::for_each(v.begin(), v.end(),
282  [](const subscription& s) {
283  s.unsubscribe(); });
284  }
285  }
286  };
287 
288 public:
289  typedef std::shared_ptr<composite_subscription_state> shared_state_type;
290 
291 protected:
292  mutable shared_state_type state;
293 
294 public:
295  composite_subscription_inner()
296  : state(std::make_shared<composite_subscription_state>())
297  {
298  }
299  composite_subscription_inner(tag_composite_subscription_empty et)
300  : state(std::make_shared<composite_subscription_state>(et))
301  {
302  }
303 
304  composite_subscription_inner(const composite_subscription_inner& o)
305  : state(o.state)
306  {
307  if (!state) {
308  std::terminate();
309  }
310  }
311  composite_subscription_inner(composite_subscription_inner&& o)
312  : state(std::move(o.state))
313  {
314  if (!state) {
315  std::terminate();
316  }
317  }
318 
319  composite_subscription_inner& operator=(composite_subscription_inner o)
320  {
321  state = std::move(o.state);
322  if (!state) {
323  std::terminate();
324  }
325  return *this;
326  }
327 
328  inline weak_subscription add(subscription s) const {
329  if (!state) {
330  std::terminate();
331  }
332  return state->add(std::move(s));
333  }
334  inline void remove(weak_subscription w) const {
335  if (!state) {
336  std::terminate();
337  }
338  state->remove(std::move(w));
339  }
340  inline void clear() const {
341  if (!state) {
342  std::terminate();
343  }
344  state->clear();
345  }
346  inline void unsubscribe() {
347  if (!state) {
348  std::terminate();
349  }
350  state->unsubscribe();
351  }
352 };
353 
354 inline composite_subscription shared_empty();
355 
356 }
357 
365  : protected detail::composite_subscription_inner
366  , public subscription
367 {
368  typedef detail::composite_subscription_inner inner_type;
369 public:
371 
372  composite_subscription(detail::tag_composite_subscription_empty et)
373  : inner_type(et)
374  , subscription() // use empty base
375  {
376  }
377 
378 public:
379 
381  : inner_type()
382  , subscription(*static_cast<const inner_type* const>(this))
383  {
384  }
385 
387  : inner_type(o)
388  , subscription(static_cast<const subscription&>(o))
389  {
390  }
392  : inner_type(std::move(o))
393  , subscription(std::move(static_cast<subscription&>(o)))
394  {
395  }
396 
398  {
399  inner_type::operator=(std::move(o));
400  subscription::operator=(std::move(*static_cast<subscription*>(&o)));
401  return *this;
402  }
403 
404  static inline composite_subscription empty() {
405  return detail::shared_empty();
406  }
407 
410 
411  using inner_type::clear;
412 
413  inline weak_subscription add(subscription s) const {
414  if (s == static_cast<const subscription&>(*this)) {
415  // do not nest the same subscription
416  std::terminate();
417  //return s.get_weak();
418  }
419  auto that = this->subscription::state.get();
420  trace_activity().subscription_add_enter(*that, s);
421  auto w = inner_type::add(std::move(s));
422  trace_activity().subscription_add_return(*that);
423  return w;
424  }
425 
426  template<class F>
427  auto add(F f) const
428  -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
429  return add(make_subscription(std::move(f)));
430  }
431 
432  inline void remove(weak_subscription w) const {
433  auto that = this->subscription::state.get();
434  trace_activity().subscription_remove_enter(*that, w);
435  inner_type::remove(w);
436  trace_activity().subscription_remove_return(*that);
437  }
438 };
439 
440 inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) {
441  return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs);
442 }
443 inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
444  return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs);
445 }
446 inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
447  return !(lhs == rhs);
448 }
449 
450 namespace detail {
451 
452 inline composite_subscription shared_empty() {
453  static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
454  return shared_empty;
455 }
456 
457 }
458 
459 template<class T>
461 {
462 public:
464 
466  : lifetime(composite_subscription())
467  , value(std::make_shared<rxu::detail::maybe<T>>())
468  {
469  }
470 
472  : lifetime(std::move(cs))
473  , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
474  {
475  auto localValue = value;
476  lifetime.add(
477  [localValue](){
478  localValue->reset();
479  }
480  );
481  }
482 
483  T& get() {
484  return value.get()->get();
485  }
487  return lifetime;
488  }
489 
490  bool is_subscribed() const {
491  return lifetime.is_subscribed();
492  }
493  weak_subscription add(subscription s) const {
494  return lifetime.add(std::move(s));
495  }
496  template<class F>
497  auto add(F f) const
498  -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
499  return lifetime.add(make_subscription(std::move(f)));
500  }
501  void remove(weak_subscription w) const {
502  return lifetime.remove(std::move(w));
503  }
504  void clear() const {
505  return lifetime.clear();
506  }
507  void unsubscribe() const {
508  return lifetime.unsubscribe();
509  }
510 
511 protected:
513  std::shared_ptr<rxu::detail::maybe<T>> value;
514 };
515 
516 }
517 
518 #endif
composite_subscription lifetime
Definition: rx-subscription.hpp:512
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:181
subscription(subscription &&o)
Definition: rx-subscription.hpp:153
Definition: rx-all.hpp:26
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:493
static_subscription(const static_subscription &o)
Definition: rx-subscription.hpp:50
Definition: rx-subscription.hpp:460
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
bool operator<(const subscription &lhs, const subscription &rhs)
Definition: rx-subscription.hpp:186
Definition: rx-subscription.hpp:28
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
tag_subscription subscription_tag
Definition: rx-subscription.hpp:29
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
subscription()
Definition: rx-subscription.hpp:122
subscription & operator=(subscription o)
Definition: rx-subscription.hpp:160
subscription(U u, typename std::enable_if<!std::is_same< subscription, U >::value &&is_subscription< U >::value, void ** >::type=nullptr)
Definition: rx-subscription.hpp:138
resource()
Definition: rx-subscription.hpp:465
std::shared_ptr< base_subscription_state > state
Definition: rx-subscription.hpp:107
Definition: rx-subscription.hpp:31
composite_subscription & get_subscription()
Definition: rx-subscription.hpp:486
composite_subscription(composite_subscription &&o)
Definition: rx-subscription.hpp:391
subscription(const subscription &o)
Definition: rx-subscription.hpp:146
static composite_subscription empty()
Definition: rx-subscription.hpp:404
void unsubscribe() const
Definition: rx-subscription.hpp:62
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-subscription.hpp:42
composite_subscription & operator=(composite_subscription o)
Definition: rx-subscription.hpp:397
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:427
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:497
void unsubscribe() const
Definition: rx-subscription.hpp:507
void clear() const
Definition: rx-subscription.hpp:504
void unsubscribe() const
Definition: rx-subscription.hpp:170
static_subscription(static_subscription &&o)
Definition: rx-subscription.hpp:54
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
static_subscription(unsubscribe_call_type s)
Definition: rx-subscription.hpp:58
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
subscription(U u, typename std::enable_if<!is_subscription< U >::value, void ** >::type=nullptr)
Definition: rx-subscription.hpp:130
std::weak_ptr< base_subscription_state > weak_state_type
Definition: rx-subscription.hpp:84
composite_subscription(const composite_subscription &o)
Definition: rx-subscription.hpp:386
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscription.hpp:463
Definition: rx-subscription.hpp:29
weak_state_type get_weak()
Definition: rx-subscription.hpp:178
composite_subscription(detail::tag_composite_subscription_empty et)
Definition: rx-subscription.hpp:372
composite_subscription()
Definition: rx-subscription.hpp:380
Definition: rx-subscription.hpp:67
std::shared_ptr< rxu::detail::maybe< T > > value
Definition: rx-subscription.hpp:513
bool is_subscribed() const
Definition: rx-subscription.hpp:164
bool is_subscribed() const
Definition: rx-subscription.hpp:490
resource(T t, composite_subscription cs=composite_subscription())
Definition: rx-subscription.hpp:471