RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-grouped_observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_GROUPED_OBSERVABLE_HPP)
6 #define RXCPP_RX_GROUPED_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class K, class Source>
15 struct has_on_get_key_for
16 {
17  struct not_void {};
18  template<class CS>
19  static auto check(int) -> decltype((*(CS*)nullptr).on_get_key());
20  template<class CS>
21  static not_void check(...);
22 
23  typedef decltype(check<Source>(0)) detail_result;
24  static const bool value = std::is_same<detail_result, rxu::decay_t<K>>::value;
25 };
26 
27 }
28 
29 template<class K, class T>
31  : public dynamic_observable<T>
32 {
33 public:
36 
37 private:
38  struct state_type
39  : public std::enable_shared_from_this<state_type>
40  {
41  typedef std::function<key_type()> ongetkey_type;
42 
43  ongetkey_type on_get_key;
44  };
45  std::shared_ptr<state_type> state;
46 
47  template<class U, class V>
49 
50  template<class U, class V>
51  void construct(const dynamic_grouped_observable<U, V>& o, const tag_dynamic_grouped_observable&) {
52  state = o.state;
53  }
54 
55  template<class U, class V>
57  state = std::move(o.state);
58  }
59 
60  template<class SO>
61  void construct(SO&& source, const rxs::tag_source&) {
62  auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
63  state->on_get_key = [so]() mutable {
64  return so->on_get_key();
65  };
66  }
67 
68 public:
69 
71  {
72  }
73 
74  template<class SOF>
75  explicit dynamic_grouped_observable(SOF sof)
76  : dynamic_observable<T>(sof)
77  , state(std::make_shared<state_type>())
78  {
79  construct(std::move(sof),
81  }
82 
83  template<class SF, class CF>
84  dynamic_grouped_observable(SF&& sf, CF&& cf)
85  : dynamic_observable<T>(std::forward<SF>(sf))
86  , state(std::make_shared<state_type>())
87  {
88  state->on_connect = std::forward<CF>(cf);
89  }
90 
92 
93  key_type on_get_key() const {
94  return state->on_get_key();
95  }
96 };
97 
98 template<class K, class T>
100  return lhs.state == rhs.state;
101 }
102 template<class K, class T>
104  return !(lhs == rhs);
105 }
106 
107 template<class K, class T, class Source>
109  return grouped_observable<K, T>(dynamic_grouped_observable<K, T>(std::forward<Source>(s)));
110 }
111 
112 
113 
120 template<class K, class T, class SourceOperator>
122  : public observable<T, SourceOperator>
123 {
126  typedef rxu::decay_t<SourceOperator> source_operator_type;
127 
128  static_assert(detail::has_on_get_key_for<K, source_operator_type>::value, "inner must have on_get_key method key_type()");
129 
130 public:
131  typedef rxu::decay_t<K> key_type;
133 
135  {
136  }
137 
138  explicit grouped_observable(const SourceOperator& o)
139  : base_type(o)
140  {
141  }
142  explicit grouped_observable(SourceOperator&& o)
143  : base_type(std::move(o))
144  {
145  }
146 
147  // implicit conversion between observables of the same value_type
148  template<class SO>
150  : base_type(o)
151  {}
152  // implicit conversion between observables of the same value_type
153  template<class SO>
155  : base_type(std::move(o))
156  {}
157 
162  return *this;
163  }
164 
165  key_type get_key() const {
166  return base_type::source_operator.on_get_key();
167  }
168 };
169 
170 
171 }
172 
173 //
174 // support range() >> filter() >> subscribe() syntax
175 // '>>' is spelled 'stream'
176 //
177 template<class K, class T, class SourceOperator, class OperatorFactory>
178 auto operator >> (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
179  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
180  return source.op(std::forward<OperatorFactory>(of));
181 }
182 
183 //
184 // support range() | filter() | subscribe() syntax
185 // '|' is spelled 'pipe'
186 //
187 template<class K, class T, class SourceOperator, class OperatorFactory>
188 auto operator | (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
189  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
190  return source.op(std::forward<OperatorFactory>(of));
191 }
192 
193 #endif
grouped_observable(SourceOperator &&o)
Definition: rx-grouped_observable.hpp:142
key_type get_key() const
Definition: rx-grouped_observable.hpp:165
Definition: rx-grouped_observable.hpp:30
a source of observables which each emit values from one category specified by the key selector...
Definition: rx-grouped_observable.hpp:121
Definition: rx-all.hpp:26
rxu::decay_t< K > key_type
Definition: rx-grouped_observable.hpp:34
tag_dynamic_grouped_observable dynamic_observable_tag
Definition: rx-grouped_observable.hpp:35
rxu::decay_t< K > key_type
Definition: rx-grouped_observable.hpp:128
auto operator|(const rxcpp::grouped_observable< K, T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-grouped_observable.hpp:188
grouped_observable(grouped_observable< K, T, SO > &&o)
Definition: rx-grouped_observable.hpp:154
dynamic_grouped_observable(SF &&sf, CF &&cf)
Definition: rx-grouped_observable.hpp:84
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-predef.hpp:232
Definition: rx-sources.hpp:15
Definition: rx-observable.hpp:36
Definition: rx-predef.hpp:235
grouped_observable< K, T > make_dynamic_grouped_observable(Source &&s)
Definition: rx-grouped_observable.hpp:108
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
grouped_observable< K, T > as_dynamic() const
Definition: rx-grouped_observable.hpp:161
grouped_observable(const grouped_observable< K, T, SO > &o)
Definition: rx-grouped_observable.hpp:149
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
grouped_observable()
Definition: rx-grouped_observable.hpp:134
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-predef.hpp:257
auto operator>>(const rxcpp::grouped_observable< K, T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-grouped_observable.hpp:178
tag_grouped_observable observable_tag
Definition: rx-grouped_observable.hpp:132
dynamic_grouped_observable(SOF sof)
Definition: rx-grouped_observable.hpp:75
dynamic_grouped_observable()
Definition: rx-grouped_observable.hpp:70
key_type on_get_key() const
Definition: rx-grouped_observable.hpp:93
grouped_observable(const SourceOperator &o)
Definition: rx-grouped_observable.hpp:138