RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-concat.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
41 #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP)
42 #define RXCPP_OPERATORS_RX_CONCAT_HPP
43 
44 #include "../rx-includes.hpp"
45 
46 namespace rxcpp {
47 
48 namespace operators {
49 
50 namespace detail {
51 
52 template<class... AN>
53 struct concat_invalid_arguments {};
54 
55 template<class... AN>
56 struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> {
57  using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>;
58 };
59 template<class... AN>
60 using concat_invalid_t = typename concat_invalid<AN...>::type;
61 
62 template<class T, class Observable, class Coordination>
63 struct concat
64  : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
65 {
66  typedef concat<T, Observable, Coordination> this_type;
67 
68  typedef rxu::decay_t<T> source_value_type;
69  typedef rxu::decay_t<Observable> source_type;
70  typedef rxu::decay_t<Coordination> coordination_type;
71 
72  typedef typename coordination_type::coordinator_type coordinator_type;
73 
74  typedef typename source_type::source_operator_type source_operator_type;
75  typedef source_value_type collection_type;
76  typedef typename collection_type::value_type value_type;
77 
78  struct values
79  {
80  values(source_operator_type o, coordination_type sf)
81  : source_operator(std::move(o))
82  , coordination(std::move(sf))
83  {
84  }
85  source_operator_type source_operator;
86  coordination_type coordination;
87  };
88  values initial;
89 
90  concat(const source_type& o, coordination_type sf)
91  : initial(o.source_operator, std::move(sf))
92  {
93  }
94 
95  template<class Subscriber>
96  void on_subscribe(Subscriber scbr) const {
97  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
98 
99  typedef Subscriber output_type;
100 
101  struct concat_state_type
102  : public std::enable_shared_from_this<concat_state_type>
103  , public values
104  {
105  concat_state_type(values i, coordinator_type coor, output_type oarg)
106  : values(i)
107  , source(i.source_operator)
108  , sourceLifetime(composite_subscription::empty())
109  , collectionLifetime(composite_subscription::empty())
110  , coordinator(std::move(coor))
111  , out(std::move(oarg))
112  {
113  }
114 
115  void subscribe_to(collection_type st)
116  {
117  auto state = this->shared_from_this();
118 
119  collectionLifetime = composite_subscription();
120 
121  // when the out observer is unsubscribed all the
122  // inner subscriptions are unsubscribed as well
123  auto innercstoken = state->out.add(collectionLifetime);
124 
125  collectionLifetime.add(make_subscription([state, innercstoken](){
126  state->out.remove(innercstoken);
127  }));
128 
129  auto selectedSource = on_exception(
130  [&](){return state->coordinator.in(std::move(st));},
131  state->out);
132  if (selectedSource.empty()) {
133  return;
134  }
135 
136  // this subscribe does not share the out subscription
137  // so that when it is unsubscribed the out will continue
138  auto sinkInner = make_subscriber<value_type>(
139  state->out,
140  collectionLifetime,
141  // on_next
142  [state, st](value_type ct) {
143  state->out.on_next(ct);
144  },
145  // on_error
146  [state](std::exception_ptr e) {
147  state->out.on_error(e);
148  },
149  //on_completed
150  [state](){
151  if (!state->selectedCollections.empty()) {
152  auto value = state->selectedCollections.front();
153  state->selectedCollections.pop_front();
154  state->collectionLifetime.unsubscribe();
155  state->subscribe_to(value);
156  } else if (!state->sourceLifetime.is_subscribed()) {
157  state->out.on_completed();
158  }
159  }
160  );
161  auto selectedSinkInner = on_exception(
162  [&](){return state->coordinator.out(sinkInner);},
163  state->out);
164  if (selectedSinkInner.empty()) {
165  return;
166  }
167  selectedSource->subscribe(std::move(selectedSinkInner.get()));
168  }
169  observable<source_value_type, source_operator_type> source;
170  composite_subscription sourceLifetime;
171  composite_subscription collectionLifetime;
172  std::deque<collection_type> selectedCollections;
173  coordinator_type coordinator;
174  output_type out;
175  };
176 
177  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
178 
179  // take a copy of the values for each subscription
180  auto state = std::make_shared<concat_state_type>(initial, std::move(coordinator), std::move(scbr));
181 
182  state->sourceLifetime = composite_subscription();
183 
184  // when the out observer is unsubscribed all the
185  // inner subscriptions are unsubscribed as well
186  state->out.add(state->sourceLifetime);
187 
188  auto source = on_exception(
189  [&](){return state->coordinator.in(state->source);},
190  state->out);
191  if (source.empty()) {
192  return;
193  }
194 
195  // this subscribe does not share the observer subscription
196  // so that when it is unsubscribed the observer can be called
197  // until the inner subscriptions have finished
198  auto sink = make_subscriber<collection_type>(
199  state->out,
200  state->sourceLifetime,
201  // on_next
202  [state](collection_type st) {
203  if (state->collectionLifetime.is_subscribed()) {
204  state->selectedCollections.push_back(st);
205  } else if (state->selectedCollections.empty()) {
206  state->subscribe_to(st);
207  }
208  },
209  // on_error
210  [state](std::exception_ptr e) {
211  state->out.on_error(e);
212  },
213  // on_completed
214  [state]() {
215  if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
216  state->out.on_completed();
217  }
218  }
219  );
220  auto selectedSink = on_exception(
221  [&](){return state->coordinator.out(sink);},
222  state->out);
223  if (selectedSink.empty()) {
224  return;
225  }
226  source->subscribe(std::move(selectedSink.get()));
227  }
228 };
229 
230 }
231 
234 template<class... AN>
235 auto concat(AN&&... an)
236  -> operator_factory<concat_tag, AN...> {
237  return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
238 }
239 
240 }
241 
242 template<>
244 {
245  template<class Observable,
246  class Enabled = rxu::enable_if_all_true_type_t<
248  class SourceValue = rxu::value_type_t<Observable>,
249  class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
250  class Value = rxu::value_type_t<SourceValue>,
251  class Result = observable<Value, Concat>
252  >
253  static Result member(Observable&& o) {
254  return Result(Concat(std::forward<Observable>(o), identity_current_thread()));
255  }
256 
257  template<class Observable, class Coordination,
258  class Enabled = rxu::enable_if_all_true_type_t<
259  is_observable<Observable>,
261  class SourceValue = rxu::value_type_t<Observable>,
262  class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
263  class Value = rxu::value_type_t<SourceValue>,
264  class Result = observable<Value, Concat>
265  >
266  static Result member(Observable&& o, Coordination&& cn) {
267  return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn)));
268  }
269 
270  template<class Observable, class Value0, class... ValueN,
271  class Enabled = rxu::enable_if_all_true_type_t<
272  all_observables<Observable, Value0, ValueN...>>,
273  class EmittedValue = rxu::value_type_t<Observable>,
274  class SourceValue = observable<EmittedValue>,
275  class ObservableObservable = observable<SourceValue>,
277  class Value = rxu::value_type_t<Concat>,
278  class Result = observable<Value, Concat>
279  >
280  static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
281  return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
282  }
283 
284  template<class Observable, class Coordination, class Value0, class... ValueN,
285  class Enabled = rxu::enable_if_all_true_type_t<
286  all_observables<Observable, Value0, ValueN...>,
287  is_coordination<Coordination>>,
288  class EmittedValue = rxu::value_type_t<Observable>,
289  class SourceValue = observable<EmittedValue>,
290  class ObservableObservable = observable<SourceValue>,
292  class Value = rxu::value_type_t<Concat>,
293  class Result = observable<Value, Concat>
294  >
295  static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
296  return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
297  }
298 
299  template<class... AN>
300  static operators::detail::concat_invalid_t<AN...> member(AN...) {
301  std::terminate();
302  return {};
303  static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)");
304  }
305 };
306 
307 }
308 
309 #endif
static operators::detail::concat_invalid_t< AN... > member(AN...)
Definition: rx-concat.hpp:300
Definition: rx-util.hpp:100
static Result member(Observable &&o, Coordination &&cn, Value0 &&v0, ValueN &&...vn)
Definition: rx-concat.hpp:295
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:157
static Result member(Observable &&o)
Definition: rx-concat.hpp:253
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static composite_subscription empty()
Definition: rx-subscription.hpp:404
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-util.hpp:325
static Result member(Observable &&o, Value0 &&v0, ValueN &&...vn)
Definition: rx-concat.hpp:280
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-concat.hpp:266
auto concat(AN &&...an) -> operator_factory< concat_tag, AN... >
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-concat.hpp:235
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37