RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-merge.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
43 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP)
44 #define RXCPP_OPERATORS_RX_MERGE_HPP
45 
46 #include "../rx-includes.hpp"
47 
48 namespace rxcpp {
49 
50 namespace operators {
51 
52 namespace detail {
53 
54 template<class... AN>
55 struct merge_invalid_arguments {};
56 
57 template<class... AN>
58 struct merge_invalid : public rxo::operator_base<merge_invalid_arguments<AN...>> {
59  using type = observable<merge_invalid_arguments<AN...>, merge_invalid<AN...>>;
60 };
61 template<class... AN>
62 using merge_invalid_t = typename merge_invalid<AN...>::type;
63 
64 template<class T, class Observable, class Coordination>
65 struct merge
66  : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
67 {
68  //static_assert(is_observable<Observable>::value, "merge requires an observable");
69  //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
70 
71  typedef merge<T, Observable, Coordination> this_type;
72 
73  typedef rxu::decay_t<T> source_value_type;
74  typedef rxu::decay_t<Observable> source_type;
75 
76  typedef typename source_type::source_operator_type source_operator_type;
77  typedef typename source_value_type::value_type value_type;
78 
79  typedef rxu::decay_t<Coordination> coordination_type;
80  typedef typename coordination_type::coordinator_type coordinator_type;
81 
82  struct values
83  {
84  values(source_operator_type o, coordination_type sf)
85  : source_operator(std::move(o))
86  , coordination(std::move(sf))
87  {
88  }
89  source_operator_type source_operator;
90  coordination_type coordination;
91  };
92  values initial;
93 
94  merge(const source_type& o, coordination_type sf)
95  : initial(o.source_operator, std::move(sf))
96  {
97  }
98 
99  template<class Subscriber>
100  void on_subscribe(Subscriber scbr) const {
101  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
102 
103  typedef Subscriber output_type;
104 
105  struct merge_state_type
106  : public std::enable_shared_from_this<merge_state_type>
107  , public values
108  {
109  merge_state_type(values i, coordinator_type coor, output_type oarg)
110  : values(i)
111  , source(i.source_operator)
112  , pendingCompletions(0)
113  , coordinator(std::move(coor))
114  , out(std::move(oarg))
115  {
116  }
117  observable<source_value_type, source_operator_type> source;
118  // on_completed on the output must wait until all the
119  // subscriptions have received on_completed
120  int pendingCompletions;
121  coordinator_type coordinator;
122  output_type out;
123  };
124 
125  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
126 
127  // take a copy of the values for each subscription
128  auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
129 
130  composite_subscription outercs;
131 
132  // when the out observer is unsubscribed all the
133  // inner subscriptions are unsubscribed as well
134  state->out.add(outercs);
135 
136  auto source = on_exception(
137  [&](){return state->coordinator.in(state->source);},
138  state->out);
139  if (source.empty()) {
140  return;
141  }
142 
143  ++state->pendingCompletions;
144  // this subscribe does not share the observer subscription
145  // so that when it is unsubscribed the observer can be called
146  // until the inner subscriptions have finished
147  auto sink = make_subscriber<source_value_type>(
148  state->out,
149  outercs,
150  // on_next
151  [state](source_value_type st) {
152 
153  composite_subscription innercs;
154 
155  // when the out observer is unsubscribed all the
156  // inner subscriptions are unsubscribed as well
157  auto innercstoken = state->out.add(innercs);
158 
159  innercs.add(make_subscription([state, innercstoken](){
160  state->out.remove(innercstoken);
161  }));
162 
163  auto selectedSource = state->coordinator.in(st);
164 
165  ++state->pendingCompletions;
166  // this subscribe does not share the source subscription
167  // so that when it is unsubscribed the source will continue
168  auto sinkInner = make_subscriber<value_type>(
169  state->out,
170  innercs,
171  // on_next
172  [state, st](value_type ct) {
173  state->out.on_next(std::move(ct));
174  },
175  // on_error
176  [state](std::exception_ptr e) {
177  state->out.on_error(e);
178  },
179  //on_completed
180  [state](){
181  if (--state->pendingCompletions == 0) {
182  state->out.on_completed();
183  }
184  }
185  );
186 
187  auto selectedSinkInner = state->coordinator.out(sinkInner);
188  selectedSource.subscribe(std::move(selectedSinkInner));
189  },
190  // on_error
191  [state](std::exception_ptr e) {
192  state->out.on_error(e);
193  },
194  // on_completed
195  [state]() {
196  if (--state->pendingCompletions == 0) {
197  state->out.on_completed();
198  }
199  }
200  );
201  auto selectedSink = on_exception(
202  [&](){return state->coordinator.out(sink);},
203  state->out);
204  if (selectedSink.empty()) {
205  return;
206  }
207  source->subscribe(std::move(selectedSink.get()));
208  }
209 };
210 
211 }
212 
215 template<class... AN>
216 auto merge(AN&&... an)
217  -> operator_factory<merge_tag, AN...> {
218  return operator_factory<merge_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
219 }
220 
221 }
222 
223 template<>
225 {
226  template<class Observable,
227  class Enabled = rxu::enable_if_all_true_type_t<
229  class SourceValue = rxu::value_type_t<Observable>,
230  class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
231  class Value = rxu::value_type_t<SourceValue>,
232  class Result = observable<Value, Merge>
233  >
234  static Result member(Observable&& o) {
235  return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
236  }
237 
238  template<class Observable, class Coordination,
239  class Enabled = rxu::enable_if_all_true_type_t<
240  is_observable<Observable>,
242  class SourceValue = rxu::value_type_t<Observable>,
243  class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
244  class Value = rxu::value_type_t<SourceValue>,
245  class Result = observable<Value, Merge>
246  >
247  static Result member(Observable&& o, Coordination&& cn) {
248  return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
249  }
250 
251  template<class Observable, class Value0, class... ValueN,
252  class Enabled = rxu::enable_if_all_true_type_t<
253  all_observables<Observable, Value0, ValueN...>>,
254  class EmittedValue = rxu::value_type_t<Observable>,
255  class SourceValue = observable<EmittedValue>,
256  class ObservableObservable = observable<SourceValue>,
258  class Value = rxu::value_type_t<Merge>,
259  class Result = observable<Value, Merge>
260  >
261  static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
262  return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
263  }
264 
265  template<class Observable, class Coordination, class Value0, class... ValueN,
266  class Enabled = rxu::enable_if_all_true_type_t<
267  all_observables<Observable, Value0, ValueN...>,
268  is_coordination<Coordination>>,
269  class EmittedValue = rxu::value_type_t<Observable>,
270  class SourceValue = observable<EmittedValue>,
271  class ObservableObservable = observable<SourceValue>,
273  class Value = rxu::value_type_t<Merge>,
274  class Result = observable<Value, Merge>
275  >
276  static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
277  return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
278  }
279 
280  template<class... AN>
281  static operators::detail::merge_invalid_t<AN...> member(AN...) {
282  std::terminate();
283  return {};
284  static_assert(sizeof...(AN) == 10000, "merge takes (optional Coordination, optional Value0, optional ValueN...)");
285  }
286 };
287 
288 }
289 
290 #endif
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:255
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-util.hpp:325
static Result member(Observable &&o)
Definition: rx-merge.hpp:234
auto merge(AN &&...an) -> operator_factory< merge_tag, AN... >
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-merge.hpp:216
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static Result member(Observable &&o, Coordination &&cn, Value0 &&v0, ValueN &&...vn)
Definition: rx-merge.hpp:276
static operators::detail::merge_invalid_t< AN... > member(AN...)
Definition: rx-merge.hpp:281
static Result member(Observable &&o, Value0 &&v0, ValueN &&...vn)
Definition: rx-merge.hpp:261
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-merge.hpp:247
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37