RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-amb.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
39 #if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
40 #define RXCPP_OPERATORS_RX_AMB_HPP
41 
42 #include "../rx-includes.hpp"
43 
44 namespace rxcpp {
45 
46 namespace operators {
47 
48 namespace detail {
49 
50 template<class... AN>
51 struct amb_invalid_arguments {};
52 
53 template<class... AN>
54 struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> {
55  using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>;
56 };
57 template<class... AN>
58 using amb_invalid_t = typename amb_invalid<AN...>::type;
59 
60 template<class T, class Observable, class Coordination>
61 struct amb
62  : public operator_base<rxu::value_type_t<T>>
63 {
64  //static_assert(is_observable<Observable>::value, "amb requires an observable");
65  //static_assert(is_observable<T>::value, "amb requires an observable that contains observables");
66 
67  typedef amb<T, Observable, Coordination> this_type;
68 
69  typedef rxu::decay_t<T> source_value_type;
70  typedef rxu::decay_t<Observable> source_type;
71 
72  typedef typename source_type::source_operator_type source_operator_type;
73  typedef typename source_value_type::value_type value_type;
74 
75  typedef rxu::decay_t<Coordination> coordination_type;
76  typedef typename coordination_type::coordinator_type coordinator_type;
77 
78  struct values
79  {
80  values(source_operator_type o, coordination_type sf)
81  : source_operator(std::move(o))
82  , coordination(std::move(sf))
83  {
84  }
85  source_operator_type source_operator;
86  coordination_type coordination;
87  };
88  values initial;
89 
90  amb(const source_type& o, coordination_type sf)
91  : initial(o.source_operator, std::move(sf))
92  {
93  }
94 
95  template<class Subscriber>
96  void on_subscribe(Subscriber scbr) const {
97  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
98 
99  typedef Subscriber output_type;
100 
101  struct amb_state_type
102  : public std::enable_shared_from_this<amb_state_type>
103  , public values
104  {
105  amb_state_type(values i, coordinator_type coor, output_type oarg)
106  : values(i)
107  , source(i.source_operator)
108  , coordinator(std::move(coor))
109  , out(std::move(oarg))
110  , pendingObservables(0)
111  , firstEmitted(false)
112  {
113  }
114  observable<source_value_type, source_operator_type> source;
115  coordinator_type coordinator;
116  output_type out;
117  int pendingObservables;
118  bool firstEmitted;
119  std::vector<composite_subscription> innerSubscriptions;
120  };
121 
122  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
123 
124  // take a copy of the values for each subscription
125  auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr));
126 
127  composite_subscription outercs;
128 
129  // when the out observer is unsubscribed all the
130  // inner subscriptions are unsubscribed as well
131  state->out.add(outercs);
132 
133  auto source = on_exception(
134  [&](){return state->coordinator.in(state->source);},
135  state->out);
136  if (source.empty()) {
137  return;
138  }
139 
140  // this subscribe does not share the observer subscription
141  // so that when it is unsubscribed the observer can be called
142  // until the inner subscriptions have finished
143  auto sink = make_subscriber<source_value_type>(
144  state->out,
145  outercs,
146  // on_next
147  [state](source_value_type st) {
148 
149  if (state->firstEmitted)
150  return;
151 
152  composite_subscription innercs;
153 
154  state->innerSubscriptions.push_back(innercs);
155 
156  // when the out observer is unsubscribed all the
157  // inner subscriptions are unsubscribed as well
158  auto innercstoken = state->out.add(innercs);
159 
160  innercs.add(make_subscription([state, innercstoken](){
161  state->out.remove(innercstoken);
162  }));
163 
164  auto selectedSource = state->coordinator.in(st);
165 
166  auto current_id = state->pendingObservables++;
167 
168  // this subscribe does not share the source subscription
169  // so that when it is unsubscribed the source will continue
170  auto sinkInner = make_subscriber<value_type>(
171  state->out,
172  innercs,
173  // on_next
174  [state, st, current_id](value_type ct) {
175  state->out.on_next(std::move(ct));
176  if (!state->firstEmitted) {
177  state->firstEmitted = true;
178  auto do_unsubscribe = [](composite_subscription cs) {
179  cs.unsubscribe();
180  };
181  std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe);
182  std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe);
183  }
184  },
185  // on_error
186  [state](std::exception_ptr e) {
187  state->out.on_error(e);
188  },
189  //on_completed
190  [state](){
191  state->out.on_completed();
192  }
193  );
194 
195  auto selectedSinkInner = state->coordinator.out(sinkInner);
196  selectedSource.subscribe(std::move(selectedSinkInner));
197  },
198  // on_error
199  [state](std::exception_ptr e) {
200  state->out.on_error(e);
201  },
202  // on_completed
203  [state]() {
204  if (state->pendingObservables == 0) {
205  state->out.on_completed();
206  }
207  }
208  );
209  auto selectedSink = on_exception(
210  [&](){return state->coordinator.out(sink);},
211  state->out);
212  if (selectedSink.empty()) {
213  return;
214  }
215  source->subscribe(std::move(selectedSink.get()));
216  }
217 };
218 
219 }
220 
223 template<class... AN>
224 auto amb(AN&&... an)
225  -> operator_factory<amb_tag, AN...> {
226  return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
227 }
228 
229 }
230 
231 template<>
233 {
234  template<class Observable,
235  class Enabled = rxu::enable_if_all_true_type_t<
237  class SourceValue = rxu::value_type_t<Observable>,
238  class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
239  class Value = rxu::value_type_t<SourceValue>,
240  class Result = observable<Value, Amb>
241  >
242  static Result member(Observable&& o) {
243  return Result(Amb(std::forward<Observable>(o), identity_current_thread()));
244  }
245 
246  template<class Observable, class Coordination,
247  class Enabled = rxu::enable_if_all_true_type_t<
248  is_observable<Observable>,
250  class SourceValue = rxu::value_type_t<Observable>,
251  class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
252  class Value = rxu::value_type_t<SourceValue>,
253  class Result = observable<Value, Amb>
254  >
255  static Result member(Observable&& o, Coordination&& cn) {
256  return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn)));
257  }
258 
259  template<class Observable, class Value0, class... ValueN,
260  class Enabled = rxu::enable_if_all_true_type_t<
261  all_observables<Observable, Value0, ValueN...>>,
262  class EmittedValue = rxu::value_type_t<Observable>,
263  class SourceValue = observable<EmittedValue>,
264  class ObservableObservable = observable<SourceValue>,
266  class Value = rxu::value_type_t<Amb>,
267  class Result = observable<Value, Amb>
268  >
269  static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
270  return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
271  }
272 
273  template<class Observable, class Coordination, class Value0, class... ValueN,
274  class Enabled = rxu::enable_if_all_true_type_t<
275  all_observables<Observable, Value0, ValueN...>,
276  is_coordination<Coordination>>,
277  class EmittedValue = rxu::value_type_t<Observable>,
278  class SourceValue = observable<EmittedValue>,
279  class ObservableObservable = observable<SourceValue>,
281  class Value = rxu::value_type_t<Amb>,
282  class Result = observable<Value, Amb>
283  >
284  static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
285  return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
286  }
287 
288  template<class... AN>
289  static operators::detail::amb_invalid_t<AN...> member(AN...) {
290  std::terminate();
291  return {};
292  static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)");
293  }
294 };
295 
296 }
297 
298 #endif
auto amb(AN &&...an) -> operator_factory< amb_tag, AN... >
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-amb.hpp:224
static Result member(Observable &&o, Coordination &&cn, Value0 &&v0, ValueN &&...vn)
Definition: rx-amb.hpp:284
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o, Value0 &&v0, ValueN &&...vn)
Definition: rx-amb.hpp:269
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
static operators::detail::amb_invalid_t< AN... > member(AN...)
Definition: rx-amb.hpp:289
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-util.hpp:325
Definition: rx-operators.hpp:103
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-amb.hpp:255
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static Result member(Observable &&o)
Definition: rx-amb.hpp:242
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37