RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-connectable_observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP)
6 #define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class T>
15 struct has_on_connect
16 {
17  struct not_void {};
18  template<class CT>
19  static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription()));
20  template<class CT>
21  static not_void check(...);
22 
23  typedef decltype(check<T>(0)) detail_result;
24  static const bool value = std::is_same<detail_result, void>::value;
25 };
26 
27 }
28 
29 template<class T>
31  : public dynamic_observable<T>
32 {
33  struct state_type
34  : public std::enable_shared_from_this<state_type>
35  {
36  typedef std::function<void(composite_subscription)> onconnect_type;
37 
38  onconnect_type on_connect;
39  };
40  std::shared_ptr<state_type> state;
41 
42  template<class U>
43  void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
44  state = o.state;
45  }
46 
47  template<class U>
48  void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
49  state = std::move(o.state);
50  }
51 
52  template<class SO>
53  void construct(SO&& source, rxs::tag_source&&) {
54  auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
55  state->on_connect = [so](composite_subscription cs) mutable {
56  so->on_connect(std::move(cs));
57  };
58  }
59 
60 public:
61 
63 
65  {
66  }
67 
68  template<class SOF>
70  : dynamic_observable<T>(sof)
71  , state(std::make_shared<state_type>())
72  {
73  construct(std::move(sof),
74  typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
75  }
76 
77  template<class SF, class CF>
79  : dynamic_observable<T>(std::forward<SF>(sf))
80  , state(std::make_shared<state_type>())
81  {
82  state->on_connect = std::forward<CF>(cf);
83  }
84 
86 
88  state->on_connect(std::move(cs));
89  }
90 };
91 
92 template<class T, class Source>
94  return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s)));
95 }
96 
97 
104 template<class T, class SourceOperator>
106  : public observable<T, SourceOperator>
107 {
110  typedef rxu::decay_t<SourceOperator> source_operator_type;
111 
112  static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)");
113 
114 public:
116 
118  {
119  }
120 
121  explicit connectable_observable(const SourceOperator& o)
122  : base_type(o)
123  {
124  }
125  explicit connectable_observable(SourceOperator&& o)
126  : base_type(std::move(o))
127  {
128  }
129 
130  // implicit conversion between observables of the same value_type
131  template<class SO>
133  : base_type(o)
134  {}
135  // implicit conversion between observables of the same value_type
136  template<class SO>
138  : base_type(std::move(o))
139  {}
140 
146  template<class OperatorFactory>
147  auto op(OperatorFactory&& of) const
148  -> decltype(of(*(const this_type*)nullptr)) {
149  return of(*this);
150  static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
151  }
152 
157  return *this;
158  }
159 
161  base_type::source_operator.on_connect(cs);
162  return cs;
163  }
164 
167  template<class... AN>
168  auto ref_count(AN... an) const
170  -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
172  {
173  return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
174  }
175 
178  template<class... AN>
179  auto connect_forever(AN... an) const
181  -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
183  {
184  return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...);
185  }
186 };
187 
188 
189 }
190 
191 //
192 // support range() >> filter() >> subscribe() syntax
193 // '>>' is spelled 'stream'
194 //
195 template<class T, class SourceOperator, class OperatorFactory>
196 auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
197  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
198  return source.op(std::forward<OperatorFactory>(of));
199 }
200 
201 //
202 // support range() | filter() | subscribe() syntax
203 // '|' is spelled 'pipe'
204 //
205 template<class T, class SourceOperator, class OperatorFactory>
206 auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
207  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
208  return source.op(std::forward<OperatorFactory>(of));
209 }
210 
211 #endif
Definition: rx-predef.hpp:220
auto connect_forever(AN...an) const
takes a connectable_observable source and calls connect during the construction of the expression...
Definition: rx-connectable_observable.hpp:179
Definition: rx-operators.hpp:171
a source of values that is shared across all subscribers and does not start until connectable_observa...
Definition: rx-connectable_observable.hpp:105
Definition: rx-all.hpp:26
dynamic_connectable_observable()
Definition: rx-connectable_observable.hpp:64
connectable_observable< T > make_dynamic_connectable_observable(Source &&s)
Definition: rx-connectable_observable.hpp:93
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
tag_connectable_observable observable_tag
Definition: rx-connectable_observable.hpp:112
auto ref_count(AN...an) const
takes a connectable_observable source and uses a ref_count of the subscribers to control the connecti...
Definition: rx-connectable_observable.hpp:168
auto operator>>(const rxcpp::connectable_observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-connectable_observable.hpp:196
tag_dynamic_observable dynamic_observable_tag
Definition: rx-connectable_observable.hpp:62
Definition: rx-operators.hpp:303
connectable_observable(connectable_observable< T, SO > &&o)
Definition: rx-connectable_observable.hpp:137
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-sources.hpp:15
auto operator|(const rxcpp::connectable_observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-connectable_observable.hpp:206
Definition: rx-observable.hpp:36
connectable_observable< T > as_dynamic()
Definition: rx-connectable_observable.hpp:156
auto op(OperatorFactory &&of) const -> decltype(of(*(const this_type *) nullptr))
Definition: rx-connectable_observable.hpp:147
composite_subscription connect(composite_subscription cs=composite_subscription())
Definition: rx-connectable_observable.hpp:160
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
dynamic_connectable_observable(SOF sof)
Definition: rx-connectable_observable.hpp:69
connectable_observable()
Definition: rx-connectable_observable.hpp:117
Definition: rx-connectable_observable.hpp:30
Definition: rx-predef.hpp:270
dynamic_connectable_observable(SF &&sf, CF &&cf)
Definition: rx-connectable_observable.hpp:78
void on_connect(composite_subscription cs) const
Definition: rx-connectable_observable.hpp:87
connectable_observable(SourceOperator &&o)
Definition: rx-connectable_observable.hpp:125
connectable_observable(const SourceOperator &o)
Definition: rx-connectable_observable.hpp:121
auto observable_member(Tag, AN &&...an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
Definition: rx-predef.hpp:128
connectable_observable(const connectable_observable< T, SO > &o)
Definition: rx-connectable_observable.hpp:132
Definition: rx-predef.hpp:126