13 #if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP) 14 #define RXCPP_OPERATORS_RX_REF_COUNT_HPP 16 #include "../rx-includes.hpp" 25 struct ref_count_invalid_arguments {};
28 struct ref_count_invalid :
public rxo::operator_base<ref_count_invalid_arguments<AN...>> {
29 using type = observable<ref_count_invalid_arguments<
AN...>, ref_count_invalid<
AN...>>;
32 using ref_count_invalid_t =
typename ref_count_invalid<
AN...>::type;
34 template<
class T,
class ConnectableObservable>
35 struct ref_count :
public operator_base<T>
37 typedef rxu::decay_t<ConnectableObservable> source_type;
39 struct ref_count_state :
public std::enable_shared_from_this<ref_count_state>
41 explicit ref_count_state(source_type o)
42 : source(std::move(o))
50 composite_subscription connection;
52 std::shared_ptr<ref_count_state> state;
55 : state(std::make_shared<ref_count_state>(std::move(o)))
59 template<
class Subscriber>
60 void on_subscribe(Subscriber&& o)
const {
61 std::unique_lock<std::mutex> guard(state->lock);
62 auto needConnect = ++state->subscribers == 1;
63 auto keepAlive = state;
67 std::unique_lock<std::mutex> guard_unsubscribe(keepAlive->lock);
68 if (--keepAlive->subscribers == 0) {
69 keepAlive->connection.unsubscribe();
70 keepAlive->connection = composite_subscription();
73 keepAlive->source.subscribe(std::forward<Subscriber>(o));
75 keepAlive->source.connect(keepAlive->connection);
87 return operator_factory<ref_count_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
95 template<
class ConnectableObservable,
99 class RefCount = rxo::detail::ref_count<SourceValue, rxu::decay_t<ConnectableObservable>>,
103 static Result
member(ConnectableObservable&& o) {
104 return Result(RefCount(std::forward<ConnectableObservable>(o)));
107 template<
class...
AN>
108 static operators::detail::ref_count_invalid_t<
AN...>
member(
AN...) {
111 static_assert(
sizeof...(
AN) == 10000,
"ref_count takes no arguments");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:303
static Result member(ConnectableObservable &&o)
Definition: rx-ref_count.hpp:103
auto AN
Definition: rx-finally.hpp:105
Definition: rx-predef.hpp:222
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto ref_count(AN &&...an) -> operator_factory< ref_count_tag, AN... >
takes a connectable_observable source and uses a ref_count of the subscribers to control the connecti...
Definition: rx-ref_count.hpp:85
static operators::detail::ref_count_invalid_t< AN... > member(AN...)
Definition: rx-ref_count.hpp:108