RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-concat_map.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
31 #if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP)
32 #define RXCPP_OPERATORS_RX_CONCATMAP_HPP
33 
34 #include "../rx-includes.hpp"
35 
36 namespace rxcpp {
37 
38 namespace operators {
39 
40 namespace detail {
41 
42 template<class... AN>
43 struct concat_map_invalid_arguments {};
44 
45 template<class... AN>
46 struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> {
47  using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>;
48 };
49 template<class... AN>
50 using concat_map_invalid_t = typename concat_map_invalid<AN...>::type;
51 
52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
53 struct concat_traits {
54  typedef rxu::decay_t<Observable> source_type;
55  typedef rxu::decay_t<CollectionSelector> collection_selector_type;
56  typedef rxu::decay_t<ResultSelector> result_selector_type;
57  typedef rxu::decay_t<Coordination> coordination_type;
58 
59  typedef typename source_type::value_type source_value_type;
60 
61  struct tag_not_valid {};
62  template<class CV, class CCS>
63  static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr));
64  template<class CV, class CCS>
65  static tag_not_valid collection_check(...);
66 
67  static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "concat_map CollectionSelector must be a function with the signature observable(concat_map::source_value_type)");
68 
69  typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
70 
71 //#if _MSC_VER >= 1900
72  static_assert(is_observable<collection_type>::value, "concat_map CollectionSelector must return an observable");
73 //#endif
74 
75  typedef typename collection_type::value_type collection_value_type;
76 
77  template<class CV, class CCV, class CRS>
78  static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr));
79  template<class CV, class CCV, class CRS>
80  static tag_not_valid result_check(...);
81 
82  static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
83 
84  typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
85 };
86 
87 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
88 struct concat_map
89  : public operator_base<rxu::value_type_t<concat_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
90 {
91  typedef concat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
92  typedef concat_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
93 
94  typedef typename traits::source_type source_type;
95  typedef typename traits::collection_selector_type collection_selector_type;
96  typedef typename traits::result_selector_type result_selector_type;
97 
98  typedef typename traits::source_value_type source_value_type;
99  typedef typename traits::collection_type collection_type;
100  typedef typename traits::collection_value_type collection_value_type;
101 
102  typedef typename traits::coordination_type coordination_type;
103  typedef typename coordination_type::coordinator_type coordinator_type;
104 
105  struct values
106  {
107  values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
108  : source(std::move(o))
109  , selectCollection(std::move(s))
110  , selectResult(std::move(rs))
111  , coordination(std::move(sf))
112  {
113  }
114  source_type source;
115  collection_selector_type selectCollection;
116  result_selector_type selectResult;
117  coordination_type coordination;
118  private:
119  values& operator=(const values&) RXCPP_DELETE;
120  };
121  values initial;
122 
123  concat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
124  : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
125  {
126  }
127 
128  template<class Subscriber>
129  void on_subscribe(Subscriber scbr) const {
130  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
131 
132  typedef Subscriber output_type;
133 
134  struct concat_map_state_type
135  : public std::enable_shared_from_this<concat_map_state_type>
136  , public values
137  {
138  concat_map_state_type(values i, coordinator_type coor, output_type oarg)
139  : values(std::move(i))
140  , sourceLifetime(composite_subscription::empty())
141  , collectionLifetime(composite_subscription::empty())
142  , coordinator(std::move(coor))
143  , out(std::move(oarg))
144  {
145  }
146 
147  void subscribe_to(source_value_type st)
148  {
149  auto state = this->shared_from_this();
150 
151  auto selectedCollection = on_exception(
152  [&](){return state->selectCollection(st);},
153  state->out);
154  if (selectedCollection.empty()) {
155  return;
156  }
157 
158  collectionLifetime = composite_subscription();
159 
160  // when the out observer is unsubscribed all the
161  // inner subscriptions are unsubscribed as well
162  auto innercstoken = state->out.add(collectionLifetime);
163 
164  collectionLifetime.add(make_subscription([state, innercstoken](){
165  state->out.remove(innercstoken);
166  }));
167 
168  auto selectedSource = on_exception(
169  [&](){return state->coordinator.in(selectedCollection.get());},
170  state->out);
171  if (selectedSource.empty()) {
172  return;
173  }
174 
175  // this subscribe does not share the source subscription
176  // so that when it is unsubscribed the source will continue
177  auto sinkInner = make_subscriber<collection_value_type>(
178  state->out,
179  collectionLifetime,
180  // on_next
181  [state, st](collection_value_type ct) {
182  auto selectedResult = state->selectResult(st, std::move(ct));
183  state->out.on_next(std::move(selectedResult));
184  },
185  // on_error
186  [state](std::exception_ptr e) {
187  state->out.on_error(e);
188  },
189  //on_completed
190  [state](){
191  if (!state->selectedCollections.empty()) {
192  auto value = state->selectedCollections.front();
193  state->selectedCollections.pop_front();
194  state->collectionLifetime.unsubscribe();
195  state->subscribe_to(value);
196  } else if (!state->sourceLifetime.is_subscribed()) {
197  state->out.on_completed();
198  }
199  }
200  );
201  auto selectedSinkInner = on_exception(
202  [&](){return state->coordinator.out(sinkInner);},
203  state->out);
204  if (selectedSinkInner.empty()) {
205  return;
206  }
207  selectedSource->subscribe(std::move(selectedSinkInner.get()));
208  }
209  composite_subscription sourceLifetime;
210  composite_subscription collectionLifetime;
211  std::deque<source_value_type> selectedCollections;
212  coordinator_type coordinator;
213  output_type out;
214  };
215 
216  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
217 
218  // take a copy of the values for each subscription
219  auto state = std::make_shared<concat_map_state_type>(initial, std::move(coordinator), std::move(scbr));
220 
221  state->sourceLifetime = composite_subscription();
222 
223  // when the out observer is unsubscribed all the
224  // inner subscriptions are unsubscribed as well
225  state->out.add(state->sourceLifetime);
226 
227  auto source = on_exception(
228  [&](){return state->coordinator.in(state->source);},
229  state->out);
230  if (source.empty()) {
231  return;
232  }
233 
234  // this subscribe does not share the observer subscription
235  // so that when it is unsubscribed the observer can be called
236  // until the inner subscriptions have finished
237  auto sink = make_subscriber<source_value_type>(
238  state->out,
239  state->sourceLifetime,
240  // on_next
241  [state](source_value_type st) {
242  if (state->collectionLifetime.is_subscribed()) {
243  state->selectedCollections.push_back(st);
244  } else if (state->selectedCollections.empty()) {
245  state->subscribe_to(st);
246  }
247  },
248  // on_error
249  [state](std::exception_ptr e) {
250  state->out.on_error(e);
251  },
252  // on_completed
253  [state]() {
254  if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
255  state->out.on_completed();
256  }
257  }
258  );
259  auto selectedSink = on_exception(
260  [&](){return state->coordinator.out(sink);},
261  state->out);
262  if (selectedSink.empty()) {
263  return;
264  }
265  source->subscribe(std::move(selectedSink.get()));
266 
267  }
268 private:
269  concat_map& operator=(const concat_map&) RXCPP_DELETE;
270 };
271 
272 }
273 
276 template<class... AN>
277 auto concat_map(AN&&... an)
279  return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
280 }
281 
284 template<class... AN>
285 auto concat_transform(AN&&... an)
286 -> operator_factory<concat_map_tag, AN...> {
287  return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
288 }
289 
290 }
291 
292 template<>
294 {
295  template<class Observable, class CollectionSelector,
296  class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
297  class SourceValue = rxu::value_type_t<Observable>,
299  class ResultSelectorType = rxu::detail::take_at<1>,
300  class Enabled = rxu::enable_if_all_true_type_t<
302  class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
303  class CollectionValueType = rxu::value_type_t<CollectionType>,
305  class Result = observable<Value, ConcatMap>
306  >
307  static Result member(Observable&& o, CollectionSelector&& s) {
308  return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
309  }
310 
311  template<class Observable, class CollectionSelector, class Coordination,
312  class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
313  class SourceValue = rxu::value_type_t<Observable>,
315  class ResultSelectorType = rxu::detail::take_at<1>,
316  class Enabled = rxu::enable_if_all_true_type_t<
317  all_observables<Observable, CollectionType>,
319  class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
320  class CollectionValueType = rxu::value_type_t<CollectionType>,
322  class Result = observable<Value, ConcatMap>
323  >
324  static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
325  return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
326  }
327 
328  template<class Observable, class CollectionSelector, class ResultSelector,
329  class IsCoordination = is_coordination<ResultSelector>,
330  class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
331  class SourceValue = rxu::value_type_t<Observable>,
333  class Enabled = rxu::enable_if_all_true_type_t<
334  all_observables<Observable, CollectionType>,
336  class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
337  class CollectionValueType = rxu::value_type_t<CollectionType>,
338  class ResultSelectorType = rxu::decay_t<ResultSelector>,
340  class Result = observable<Value, ConcatMap>
341  >
342  static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
343  return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
344  }
345 
346  template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
347  class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
348  class SourceValue = rxu::value_type_t<Observable>,
350  class Enabled = rxu::enable_if_all_true_type_t<
351  all_observables<Observable, CollectionType>,
352  is_coordination<Coordination>>,
353  class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
354  class CollectionValueType = rxu::value_type_t<CollectionType>,
355  class ResultSelectorType = rxu::decay_t<ResultSelector>,
357  class Result = observable<Value, ConcatMap>
358  >
359  static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
360  return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
361  }
362 
363  template<class... AN>
364  static operators::detail::concat_map_invalid_t<AN...> member(AN...) {
365  std::terminate();
366  return {};
367  static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
368  }
369 };
370 
371 }
372 
373 #endif
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
static Result member(Observable &&o, CollectionSelector &&s, Coordination &&cn)
Definition: rx-concat_map.hpp:324
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
static Result member(Observable &&o, CollectionSelector &&s)
Definition: rx-concat_map.hpp:307
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static composite_subscription empty()
Definition: rx-subscription.hpp:404
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, CollectionSelector &&s, ResultSelector &&rs)
Definition: rx-concat_map.hpp:342
auto concat_transform(AN &&...an) -> operator_factory< concat_map_tag, AN... >
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-concat_map.hpp:285
static Result member(Observable &&o, CollectionSelector &&s, ResultSelector &&rs, Coordination &&cn)
Definition: rx-concat_map.hpp:359
auto concat_map(AN &&...an) -> operator_factory< concat_map_tag, AN... >
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-concat_map.hpp:277
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-operators.hpp:164
Definition: rx-coordination.hpp:114
typename std::result_of< TN... >::type result_of_t
Definition: rx-util.hpp:37
Definition: rx-util.hpp:802
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37
static operators::detail::concat_map_invalid_t< AN... > member(AN...)
Definition: rx-concat_map.hpp:364