RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-zip.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP)
6 #define RXCPP_OPERATORS_RX_ZIP_HPP
7 
8 #include "../rx-includes.hpp"
9 
43 namespace rxcpp {
44 
45 namespace operators {
46 
47 namespace detail {
48 
49 template<class Observable>
50 struct zip_source_state
51 {
52  using value_type = rxu::value_type_t<Observable>;
53  zip_source_state()
54  : completed(false)
55  {
56  }
57  std::list<value_type> values;
58  bool completed;
59 };
60 
61 struct values_not_empty {
62  template<class Observable>
63  bool operator()(zip_source_state<Observable>& source) const {
64  return !source.values.empty();
65  }
66 };
67 
68 struct source_completed_values_empty {
69  template<class Observable>
70  bool operator()(zip_source_state<Observable>& source) const {
71  return source.completed && source.values.empty();
72  }
73 };
74 
75 struct extract_value_front {
76  template<class Observable, class Value = rxu::value_type_t<Observable>>
77  Value operator()(zip_source_state<Observable>& source) const {
78  auto val = std::move(source.values.front());
79  source.values.pop_front();
80  return val;
81  }
82 };
83 
84 template<class... AN>
85 struct zip_invalid_arguments {};
86 
87 template<class... AN>
88 struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> {
89  using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>;
90 };
91 template<class... AN>
92 using zip_invalid_t = typename zip_invalid<AN...>::type;
93 
94 template<class Selector, class... ObservableN>
95 struct is_zip_selector_check {
96  typedef rxu::decay_t<Selector> selector_type;
97 
98  struct tag_not_valid;
99  template<class CS, class... CON>
100  static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
101  template<class CS, class... CON>
102  static tag_not_valid check(...);
103 
104  using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
105 
106  static const bool value = !std::is_same<type, tag_not_valid>::value;
107 };
108 
109 template<class Selector, class... ObservableN>
110 struct invalid_zip_selector {
111  static const bool value = false;
112 };
113 
114 template<class Selector, class... ObservableN>
115 struct is_zip_selector : public std::conditional<
116  is_zip_selector_check<Selector, ObservableN...>::value,
117  is_zip_selector_check<Selector, ObservableN...>,
118  invalid_zip_selector<Selector, ObservableN...>>::type {
119 };
120 
121 template<class Selector, class... ON>
122 using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type;
123 
124 template<class Coordination, class Selector, class... ObservableN>
125 struct zip_traits {
126  typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type;
127  typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type;
128 
129  typedef rxu::decay_t<Selector> selector_type;
130  typedef rxu::decay_t<Coordination> coordination_type;
131 
132  typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type;
133 };
134 
135 template<class Coordination, class Selector, class... ObservableN>
136 struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>>
137 {
138  typedef zip<Coordination, Selector, ObservableN...> this_type;
139 
140  typedef zip_traits<Coordination, Selector, ObservableN...> traits;
141 
142  typedef typename traits::tuple_source_type tuple_source_type;
143  typedef typename traits::tuple_source_values_type tuple_source_values_type;
144 
145  typedef typename traits::selector_type selector_type;
146 
147  typedef typename traits::coordination_type coordination_type;
148  typedef typename coordination_type::coordinator_type coordinator_type;
149 
150  struct values
151  {
152  values(tuple_source_type o, selector_type s, coordination_type sf)
153  : source(std::move(o))
154  , selector(std::move(s))
155  , coordination(std::move(sf))
156  {
157  }
158  tuple_source_type source;
159  selector_type selector;
160  coordination_type coordination;
161  };
162  values initial;
163 
164  zip(coordination_type sf, selector_type s, tuple_source_type ts)
165  : initial(std::move(ts), std::move(s), std::move(sf))
166  {
167  }
168 
169  template<int Index, class State>
170  void subscribe_one(std::shared_ptr<State> state) const {
171 
172  typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
173 
174  composite_subscription innercs;
175 
176  // when the out observer is unsubscribed all the
177  // inner subscriptions are unsubscribed as well
178  state->out.add(innercs);
179 
180  auto source = on_exception(
181  [&](){return state->coordinator.in(std::get<Index>(state->source));},
182  state->out);
183  if (source.empty()) {
184  return;
185  }
186 
187  // this subscribe does not share the observer subscription
188  // so that when it is unsubscribed the observer can be called
189  // until the inner subscriptions have finished
190  auto sink = make_subscriber<source_value_type>(
191  state->out,
192  innercs,
193  // on_next
194  [state](source_value_type st) {
195  auto& values = std::get<Index>(state->pending).values;
196  values.push_back(st);
197  if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) {
198  auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector);
199  state->out.on_next(selectedResult);
200  }
201  if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) {
202  state->out.on_completed();
203  }
204  },
205  // on_error
206  [state](std::exception_ptr e) {
207  state->out.on_error(e);
208  },
209  // on_completed
210  [state]() {
211  auto& completed = std::get<Index>(state->pending).completed;
212  completed = true;
213  if (--state->pendingCompletions == 0) {
214  state->out.on_completed();
215  }
216  }
217  );
218  auto selectedSink = on_exception(
219  [&](){return state->coordinator.out(sink);},
220  state->out);
221  if (selectedSink.empty()) {
222  return;
223  }
224  source->subscribe(std::move(selectedSink.get()));
225  }
226  template<class State, int... IndexN>
227  void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
228  bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
229  subscribed[0] = (*subscribed); // silence warning
230  }
231 
232  template<class Subscriber>
233  void on_subscribe(Subscriber scbr) const {
234  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
235 
236  typedef Subscriber output_type;
237 
238  struct zip_state_type
239  : public std::enable_shared_from_this<zip_state_type>
240  , public values
241  {
242  zip_state_type(values i, coordinator_type coor, output_type oarg)
243  : values(std::move(i))
244  , pendingCompletions(sizeof... (ObservableN))
245  , valuesSet(0)
246  , coordinator(std::move(coor))
247  , out(std::move(oarg))
248  {
249  }
250 
251  // on_completed on the output must wait until all the
252  // subscriptions have received on_completed
253  mutable int pendingCompletions;
254  mutable int valuesSet;
255  mutable tuple_source_values_type pending;
256  coordinator_type coordinator;
257  output_type out;
258  };
259 
260  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
261 
262  // take a copy of the values for each subscription
263  auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr));
264 
265  subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
266  }
267 };
268 
269 }
270 
273 template<class... AN>
274 auto zip(AN&&... an)
275  -> operator_factory<zip_tag, AN...> {
276  return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
277 }
278 
279 }
280 
281 template<>
283 {
284  template<class Observable, class... ObservableN,
285  class Enabled = rxu::enable_if_all_true_type_t<
286  all_observables<Observable, ObservableN...>>,
287  class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
288  class Value = rxu::value_type_t<Zip>,
289  class Result = observable<Value, Zip>>
290  static Result member(Observable&& o, ObservableN&&... on)
291  {
292  return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
293  }
294 
295  template<class Observable, class Selector, class... ObservableN,
296  class Enabled = rxu::enable_if_all_true_type_t<
297  operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
298  all_observables<Observable, ObservableN...>>,
299  class ResolvedSelector = rxu::decay_t<Selector>,
300  class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
301  class Value = rxu::value_type_t<Zip>,
302  class Result = observable<Value, Zip>>
303  static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
304  {
305  return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
306  }
307 
308  template<class Coordination, class Observable, class... ObservableN,
309  class Enabled = rxu::enable_if_all_true_type_t<
311  all_observables<Observable, ObservableN...>>,
312  class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
313  class Value = rxu::value_type_t<Zip>,
314  class Result = observable<Value, Zip>>
315  static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
316  {
317  return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
318  }
319 
320  template<class Coordination, class Selector, class Observable, class... ObservableN,
321  class Enabled = rxu::enable_if_all_true_type_t<
322  is_coordination<Coordination>,
323  operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
324  all_observables<Observable, ObservableN...>>,
325  class ResolvedSelector = rxu::decay_t<Selector>,
326  class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
327  class Value = rxu::value_type_t<Zip>,
328  class Result = observable<Value, Zip>>
329  static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
330  {
331  return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
332  }
333 
334  template<class... AN>
335  static operators::detail::zip_invalid_t<AN...> member(const AN&...) {
336  std::terminate();
337  return {};
338  static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
339  }
340 };
341 
342 }
343 
344 #endif
auto zip(AN &&...an) -> operator_factory< zip_tag, AN... >
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-zip.hpp:274
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:508
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
auto pack() -> detail::pack
Definition: rx-util.hpp:273
Definition: rx-operators.hpp:47
static Result member(Observable &&o, Coordination &&cn, Selector &&s, ObservableN &&...on)
Definition: rx-zip.hpp:329
static const bool value
Definition: rx-predef.hpp:123
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, Selector &&s, ObservableN &&...on)
Definition: rx-zip.hpp:303
static Result member(Observable &&o, Coordination &&cn, ObservableN &&...on)
Definition: rx-zip.hpp:315
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static Result member(Observable &&o, ObservableN &&...on)
Definition: rx-zip.hpp:290
static operators::detail::zip_invalid_t< AN... > member(const AN &...)
Definition: rx-zip.hpp:335
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37