RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-ref_count.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
13 #if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
14 #define RXCPP_OPERATORS_RX_REF_COUNT_HPP
15 
16 #include "../rx-includes.hpp"
17 
18 namespace rxcpp {
19 
20 namespace operators {
21 
22 namespace detail {
23 
24 template<class... AN>
25 struct ref_count_invalid_arguments {};
26 
27 template<class... AN>
28 struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments<AN...>> {
29  using type = observable<ref_count_invalid_arguments<AN...>, ref_count_invalid<AN...>>;
30 };
31 template<class... AN>
32 using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;
33 
34 template<class T, class ConnectableObservable>
35 struct ref_count : public operator_base<T>
36 {
37  typedef rxu::decay_t<ConnectableObservable> source_type;
38 
39  struct ref_count_state : public std::enable_shared_from_this<ref_count_state>
40  {
41  explicit ref_count_state(source_type o)
42  : source(std::move(o))
43  , subscribers(0)
44  {
45  }
46 
47  source_type source;
48  std::mutex lock;
49  long subscribers;
50  composite_subscription connection;
51  };
52  std::shared_ptr<ref_count_state> state;
53 
54  explicit ref_count(source_type o)
55  : state(std::make_shared<ref_count_state>(std::move(o)))
56  {
57  }
58 
59  template<class Subscriber>
60  void on_subscribe(Subscriber&& o) const {
61  std::unique_lock<std::mutex> guard(state->lock);
62  auto needConnect = ++state->subscribers == 1;
63  auto keepAlive = state;
64  guard.unlock();
65  o.add(
66  [keepAlive](){
67  std::unique_lock<std::mutex> guard_unsubscribe(keepAlive->lock);
68  if (--keepAlive->subscribers == 0) {
69  keepAlive->connection.unsubscribe();
70  keepAlive->connection = composite_subscription();
71  }
72  });
73  keepAlive->source.subscribe(std::forward<Subscriber>(o));
74  if (needConnect) {
75  keepAlive->source.connect(keepAlive->connection);
76  }
77  }
78 };
79 
80 }
81 
84 template<class... AN>
85 auto ref_count(AN&&... an)
87  return operator_factory<ref_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
88 }
89 
90 }
91 
92 template<>
94 {
95  template<class ConnectableObservable,
96  class Enabled = rxu::enable_if_all_true_type_t<
98  class SourceValue = rxu::value_type_t<ConnectableObservable>,
99  class RefCount = rxo::detail::ref_count<SourceValue, rxu::decay_t<ConnectableObservable>>,
100  class Value = rxu::value_type_t<RefCount>,
101  class Result = observable<Value, RefCount>
102  >
103  static Result member(ConnectableObservable&& o) {
104  return Result(RefCount(std::forward<ConnectableObservable>(o)));
105  }
106 
107  template<class... AN>
108  static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
109  std::terminate();
110  return {};
111  static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments");
112  }
113 };
114 
115 }
116 
117 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:303
static Result member(ConnectableObservable &&o)
Definition: rx-ref_count.hpp:103
auto AN
Definition: rx-finally.hpp:105
Definition: rx-predef.hpp:222
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto ref_count(AN &&...an) -> operator_factory< ref_count_tag, AN... >
takes a connectable_observable source and uses a ref_count of the subscribers to control the connecti...
Definition: rx-ref_count.hpp:85
static operators::detail::ref_count_invalid_t< AN... > member(AN...)
Definition: rx-ref_count.hpp:108