RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-group_by.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
29 #if !defined(RXCPP_OPERATORS_RX_GROUP_BY_HPP)
30 #define RXCPP_OPERATORS_RX_GROUP_BY_HPP
31 
32 #include "../rx-includes.hpp"
33 
34 namespace rxcpp {
35 
36 namespace operators {
37 
38 namespace detail {
39 
40 template<class... AN>
41 struct group_by_invalid_arguments {};
42 
43 template<class... AN>
44 struct group_by_invalid : public rxo::operator_base<group_by_invalid_arguments<AN...>> {
45  using type = observable<group_by_invalid_arguments<AN...>, group_by_invalid<AN...>>;
46 };
47 template<class... AN>
48 using group_by_invalid_t = typename group_by_invalid<AN...>::type;
49 
50 template<class T, class Selector>
51 struct is_group_by_selector_for {
52 
53  typedef rxu::decay_t<Selector> selector_type;
54  typedef T source_value_type;
55 
56  struct tag_not_valid {};
57  template<class CV, class CS>
58  static auto check(int) -> decltype((*(CS*)nullptr)(*(CV*)nullptr));
59  template<class CV, class CS>
60  static tag_not_valid check(...);
61 
62  typedef decltype(check<source_value_type, selector_type>(0)) type;
63  static const bool value = !std::is_same<type, tag_not_valid>::value;
64 };
65 
66 template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
67 struct group_by_traits
68 {
69  typedef T source_value_type;
70  typedef rxu::decay_t<Observable> source_type;
71  typedef rxu::decay_t<KeySelector> key_selector_type;
72  typedef rxu::decay_t<MarbleSelector> marble_selector_type;
73  typedef rxu::decay_t<BinaryPredicate> predicate_type;
74 
75  static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)");
76 
77  typedef typename is_group_by_selector_for<source_value_type, key_selector_type>::type key_type;
78 
79  static_assert(is_group_by_selector_for<source_value_type, marble_selector_type>::value, "group_by MarbleSelector must be a function with the signature marble_type(source_value_type)");
80 
81  typedef typename is_group_by_selector_for<source_value_type, marble_selector_type>::type marble_type;
82 
83  typedef rxsub::subject<marble_type> subject_type;
84 
85  typedef std::map<key_type, typename subject_type::subscriber_type, predicate_type> key_subscriber_map_type;
86 
87  typedef grouped_observable<key_type, marble_type> grouped_observable_type;
88 };
89 
90 template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
91 struct group_by
92 {
93  typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
94  typedef typename traits_type::key_selector_type key_selector_type;
95  typedef typename traits_type::marble_selector_type marble_selector_type;
96  typedef typename traits_type::marble_type marble_type;
97  typedef typename traits_type::predicate_type predicate_type;
98  typedef typename traits_type::subject_type subject_type;
99  typedef typename traits_type::key_type key_type;
100 
101  typedef typename traits_type::key_subscriber_map_type group_map_type;
102  typedef std::vector<typename composite_subscription::weak_subscription> bindings_type;
103 
104  struct group_by_state_type
105  {
106  group_by_state_type(composite_subscription sl, predicate_type p)
107  : source_lifetime(sl)
108  , groups(p)
109  , observers(0)
110  {}
111  composite_subscription source_lifetime;
112  rxsc::worker worker;
113  group_map_type groups;
114  std::atomic<int> observers;
115  };
116 
117  template<class Subscriber>
118  static void stopsource(Subscriber&& dest, std::shared_ptr<group_by_state_type>& state) {
119  ++state->observers;
120  dest.add([state](){
121  if (!state->source_lifetime.is_subscribed()) {
122  return;
123  }
124  --state->observers;
125  if (state->observers == 0) {
126  state->source_lifetime.unsubscribe();
127  }
128  });
129  }
130 
131  struct group_by_values
132  {
133  group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p)
134  : keySelector(std::move(ks))
135  , marbleSelector(std::move(ms))
136  , predicate(std::move(p))
137  {
138  }
139  mutable key_selector_type keySelector;
140  mutable marble_selector_type marbleSelector;
141  mutable predicate_type predicate;
142  };
143 
144  group_by_values initial;
145 
146  group_by(key_selector_type ks, marble_selector_type ms, predicate_type p)
147  : initial(std::move(ks), std::move(ms), std::move(p))
148  {
149  }
150 
151  struct group_by_observable : public rxs::source_base<marble_type>
152  {
153  mutable std::shared_ptr<group_by_state_type> state;
154  subject_type subject;
155  key_type key;
156 
157  group_by_observable(std::shared_ptr<group_by_state_type> st, subject_type s, key_type k)
158  : state(std::move(st))
159  , subject(std::move(s))
160  , key(k)
161  {
162  }
163 
164  template<class Subscriber>
165  void on_subscribe(Subscriber&& o) const {
166  group_by::stopsource(o, state);
167  subject.get_observable().subscribe(std::forward<Subscriber>(o));
168  }
169 
170  key_type on_get_key() {
171  return key;
172  }
173  };
174 
175  template<class Subscriber>
176  struct group_by_observer : public group_by_values
177  {
178  typedef group_by_observer<Subscriber> this_type;
179  typedef typename traits_type::grouped_observable_type value_type;
180  typedef rxu::decay_t<Subscriber> dest_type;
181  typedef observer<T, this_type> observer_type;
182 
183  dest_type dest;
184 
185  mutable std::shared_ptr<group_by_state_type> state;
186 
187  group_by_observer(composite_subscription l, dest_type d, group_by_values v)
188  : group_by_values(v)
189  , dest(std::move(d))
190  , state(std::make_shared<group_by_state_type>(l, group_by_values::predicate))
191  {
192  group_by::stopsource(dest, state);
193  }
194  void on_next(T v) const {
195  auto selectedKey = on_exception(
196  [&](){
197  return this->keySelector(v);},
198  [this](std::exception_ptr e){on_error(e);});
199  if (selectedKey.empty()) {
200  return;
201  }
202  auto g = state->groups.find(selectedKey.get());
203  if (g == state->groups.end()) {
204  if (!dest.is_subscribed()) {
205  return;
206  }
207  auto sub = subject_type();
208  g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
209  dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())));
210  }
211  auto selectedMarble = on_exception(
212  [&](){
213  return this->marbleSelector(v);},
214  [this](std::exception_ptr e){on_error(e);});
215  if (selectedMarble.empty()) {
216  return;
217  }
218  g->second.on_next(std::move(selectedMarble.get()));
219  }
220  void on_error(std::exception_ptr e) const {
221  for(auto& g : state->groups) {
222  g.second.on_error(e);
223  }
224  dest.on_error(e);
225  }
226  void on_completed() const {
227  for(auto& g : state->groups) {
228  g.second.on_completed();
229  }
230  dest.on_completed();
231  }
232 
233  static subscriber<T, observer_type> make(dest_type d, group_by_values v) {
234  auto cs = composite_subscription();
235  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v))));
236  }
237  };
238 
239  template<class Subscriber>
240  auto operator()(Subscriber dest) const
241  -> decltype(group_by_observer<Subscriber>::make(std::move(dest), initial)) {
242  return group_by_observer<Subscriber>::make(std::move(dest), initial);
243  }
244 };
245 
246 template<class KeySelector, class MarbleSelector, class BinaryPredicate>
247 class group_by_factory
248 {
249  typedef rxu::decay_t<KeySelector> key_selector_type;
250  typedef rxu::decay_t<MarbleSelector> marble_selector_type;
251  typedef rxu::decay_t<BinaryPredicate> predicate_type;
252  key_selector_type keySelector;
253  marble_selector_type marbleSelector;
254  predicate_type predicate;
255 public:
256  group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p)
257  : keySelector(std::move(ks))
258  , marbleSelector(std::move(ms))
259  , predicate(std::move(p))
260  {
261  }
262  template<class Observable>
263  struct group_by_factory_traits
264  {
265  typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
266  typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
267  typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
268  };
269  template<class Observable>
270  auto operator()(Observable&& source)
271  -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) {
272  return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)));
273  }
274 };
275 
276 }
277 
280 template<class... AN>
281 auto group_by(AN&&... an)
283  return operator_factory<group_by_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
284 }
285 
286 }
287 
288 template<>
290 {
291  template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
292  class SourceValue = rxu::value_type_t<Observable>,
293  class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
294  class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
295  class Value = typename Traits::grouped_observable_type>
296  static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
297  -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) {
298  return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)));
299  }
300 
301  template<class Observable, class KeySelector, class MarbleSelector,
302  class BinaryPredicate=rxu::less,
303  class SourceValue = rxu::value_type_t<Observable>,
304  class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
305  class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
306  class Value = typename Traits::grouped_observable_type>
307  static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms)
308  -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) {
309  return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()));
310  }
311 
312 
313  template<class Observable, class KeySelector,
314  class MarbleSelector=rxu::detail::take_at<0>,
315  class BinaryPredicate=rxu::less,
316  class SourceValue = rxu::value_type_t<Observable>,
317  class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
318  class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
319  class Value = typename Traits::grouped_observable_type>
320  static auto member(Observable&& o, KeySelector&& ks)
321  -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) {
322  return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()));
323  }
324 
325  template<class Observable,
326  class KeySelector=rxu::detail::take_at<0>,
327  class MarbleSelector=rxu::detail::take_at<0>,
328  class BinaryPredicate=rxu::less,
329  class Enabled = rxu::enable_if_all_true_type_t<
331  class SourceValue = rxu::value_type_t<Observable>,
332  class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
333  class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
334  class Value = typename Traits::grouped_observable_type>
335  static auto member(Observable&& o)
336  -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) {
337  return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()));
338  }
339 
340  template<class... AN>
341  static operators::detail::group_by_invalid_t<AN...> member(const AN&...) {
342  std::terminate();
343  return {};
344  static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool");
345  }
346 
347 };
348 
349 }
350 
351 #endif
352 
Definition: rx-util.hpp:100
static auto member(Observable &&o) -> decltype(o.template lift< Value >(GroupBy(rxu::detail::take_at< 0 >(), rxu::detail::take_at< 0 >(), rxu::less())))
Definition: rx-group_by.hpp:335
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
static auto member(Observable &&o, KeySelector &&ks, MarbleSelector &&ms, BinaryPredicate &&p) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), std::forward< MarbleSelector >(ms), std::forward< BinaryPredicate >(p))))
Definition: rx-group_by.hpp:296
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
auto group_by(AN &&...an) -> operator_factory< group_by_tag, AN... >
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-group_by.hpp:281
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, KeySelector &&ks, MarbleSelector &&ms) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), std::forward< MarbleSelector >(ms), rxu::less())))
Definition: rx-group_by.hpp:307
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
Definition: rx-util.hpp:411
static auto member(Observable &&o, KeySelector &&ks) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), rxu::detail::take_at< 0 >(), rxu::less())))
Definition: rx-group_by.hpp:320
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static operators::detail::group_by_invalid_t< AN... > member(const AN &...)
Definition: rx-group_by.hpp:341