RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-combine_latest.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
38 #if !defined(RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP)
39 #define RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP
40 
41 #include "../rx-includes.hpp"
42 
43 namespace rxcpp {
44 
45 namespace operators {
46 
47 namespace detail {
48 
49 template<class... AN>
50 struct combine_latest_invalid_arguments {};
51 
52 template<class... AN>
53 struct combine_latest_invalid : public rxo::operator_base<combine_latest_invalid_arguments<AN...>> {
54  using type = observable<combine_latest_invalid_arguments<AN...>, combine_latest_invalid<AN...>>;
55 };
56 template<class... AN>
57 using combine_latest_invalid_t = typename combine_latest_invalid<AN...>::type;
58 
59 template<class Selector, class... ObservableN>
60 struct is_combine_latest_selector_check {
61  typedef rxu::decay_t<Selector> selector_type;
62 
63  struct tag_not_valid;
64  template<class CS, class... CON>
65  static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
66  template<class CS, class... CON>
67  static tag_not_valid check(...);
68 
69  using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
70 
71  static const bool value = !std::is_same<type, tag_not_valid>::value;
72 };
73 
74 template<class Selector, class... ObservableN>
75 struct invalid_combine_latest_selector {
76  static const bool value = false;
77 };
78 
79 template<class Selector, class... ObservableN>
80 struct is_combine_latest_selector : public std::conditional<
81  is_combine_latest_selector_check<Selector, ObservableN...>::value,
82  is_combine_latest_selector_check<Selector, ObservableN...>,
83  invalid_combine_latest_selector<Selector, ObservableN...>>::type {
84 };
85 
86 template<class Selector, class... ON>
87 using result_combine_latest_selector_t = typename is_combine_latest_selector<Selector, ON...>::type;
88 
89 template<class Coordination, class Selector, class... ObservableN>
90 struct combine_latest_traits {
91 
92  typedef std::tuple<ObservableN...> tuple_source_type;
93  typedef std::tuple<rxu::detail::maybe<typename ObservableN::value_type>...> tuple_source_value_type;
94 
95  typedef rxu::decay_t<Selector> selector_type;
96  typedef rxu::decay_t<Coordination> coordination_type;
97 
98  typedef typename is_combine_latest_selector<selector_type, ObservableN...>::type value_type;
99 };
100 
101 template<class Coordination, class Selector, class... ObservableN>
102 struct combine_latest : public operator_base<rxu::value_type_t<combine_latest_traits<Coordination, Selector, ObservableN...>>>
103 {
104  typedef combine_latest<Coordination, Selector, ObservableN...> this_type;
105 
106  typedef combine_latest_traits<Coordination, Selector, ObservableN...> traits;
107 
108  typedef typename traits::tuple_source_type tuple_source_type;
109  typedef typename traits::tuple_source_value_type tuple_source_value_type;
110 
111  typedef typename traits::selector_type selector_type;
112 
113  typedef typename traits::coordination_type coordination_type;
114  typedef typename coordination_type::coordinator_type coordinator_type;
115 
116  struct values
117  {
118  values(tuple_source_type o, selector_type s, coordination_type sf)
119  : source(std::move(o))
120  , selector(std::move(s))
121  , coordination(std::move(sf))
122  {
123  }
124  tuple_source_type source;
125  selector_type selector;
126  coordination_type coordination;
127  };
128  values initial;
129 
130  combine_latest(coordination_type sf, selector_type s, tuple_source_type ts)
131  : initial(std::move(ts), std::move(s), std::move(sf))
132  {
133  }
134 
135  template<int Index, class State>
136  void subscribe_one(std::shared_ptr<State> state) const {
137 
138  typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
139 
140  composite_subscription innercs;
141 
142  // when the out observer is unsubscribed all the
143  // inner subscriptions are unsubscribed as well
144  state->out.add(innercs);
145 
146  auto source = on_exception(
147  [&](){return state->coordinator.in(std::get<Index>(state->source));},
148  state->out);
149  if (source.empty()) {
150  return;
151  }
152 
153  // this subscribe does not share the observer subscription
154  // so that when it is unsubscribed the observer can be called
155  // until the inner subscriptions have finished
156  auto sink = make_subscriber<source_value_type>(
157  state->out,
158  innercs,
159  // on_next
160  [state](source_value_type st) {
161  auto& value = std::get<Index>(state->latest);
162 
163  if (value.empty()) {
164  ++state->valuesSet;
165  }
166 
167  value.reset(st);
168 
169  if (state->valuesSet == sizeof... (ObservableN)) {
170  auto values = rxu::surely(state->latest);
171  auto selectedResult = rxu::apply(values, state->selector);
172  state->out.on_next(selectedResult);
173  }
174  },
175  // on_error
176  [state](std::exception_ptr e) {
177  state->out.on_error(e);
178  },
179  // on_completed
180  [state]() {
181  if (--state->pendingCompletions == 0) {
182  state->out.on_completed();
183  }
184  }
185  );
186  auto selectedSink = on_exception(
187  [&](){return state->coordinator.out(sink);},
188  state->out);
189  if (selectedSink.empty()) {
190  return;
191  }
192  source->subscribe(std::move(selectedSink.get()));
193  }
194  template<class State, int... IndexN>
195  void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
196  bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
197  subscribed[0] = (*subscribed); // silence warning
198  }
199 
200  template<class Subscriber>
201  void on_subscribe(Subscriber scbr) const {
202  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
203 
204  typedef Subscriber output_type;
205 
206  struct combine_latest_state_type
207  : public std::enable_shared_from_this<combine_latest_state_type>
208  , public values
209  {
210  combine_latest_state_type(values i, coordinator_type coor, output_type oarg)
211  : values(std::move(i))
212  , pendingCompletions(sizeof... (ObservableN))
213  , valuesSet(0)
214  , coordinator(std::move(coor))
215  , out(std::move(oarg))
216  {
217  }
218 
219  // on_completed on the output must wait until all the
220  // subscriptions have received on_completed
221  mutable int pendingCompletions;
222  mutable int valuesSet;
223  mutable tuple_source_value_type latest;
224  coordinator_type coordinator;
225  output_type out;
226  };
227 
228  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
229 
230  // take a copy of the values for each subscription
231  auto state = std::make_shared<combine_latest_state_type>(initial, std::move(coordinator), std::move(scbr));
232 
233  subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
234  }
235 };
236 
237 }
238 
241 template<class... AN>
242 auto combine_latest(AN&&... an)
244  return operator_factory<combine_latest_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
245 }
246 
247 }
248 
249 template<>
251 {
252  template<class Observable, class... ObservableN,
253  class Enabled = rxu::enable_if_all_true_type_t<
254  all_observables<Observable, ObservableN...>>,
255  class combine_latest = rxo::detail::combine_latest<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
256  class Value = rxu::value_type_t<combine_latest>,
257  class Result = observable<Value, combine_latest>>
258  static Result member(Observable&& o, ObservableN&&... on)
259  {
260  return Result(combine_latest(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
261  }
262 
263  template<class Observable, class Selector, class... ObservableN,
264  class Enabled = rxu::enable_if_all_true_type_t<
265  operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>,
266  all_observables<Observable, ObservableN...>>,
267  class ResolvedSelector = rxu::decay_t<Selector>,
268  class combine_latest = rxo::detail::combine_latest<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
269  class Value = rxu::value_type_t<combine_latest>,
270  class Result = observable<Value, combine_latest>>
271  static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
272  {
273  return Result(combine_latest(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
274  }
275 
276  template<class Coordination, class Observable, class... ObservableN,
277  class Enabled = rxu::enable_if_all_true_type_t<
279  all_observables<Observable, ObservableN...>>,
280  class combine_latest = rxo::detail::combine_latest<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
281  class Value = rxu::value_type_t<combine_latest>,
282  class Result = observable<Value, combine_latest>>
283  static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
284  {
285  return Result(combine_latest(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
286  }
287 
288  template<class Coordination, class Selector, class Observable, class... ObservableN,
289  class Enabled = rxu::enable_if_all_true_type_t<
290  is_coordination<Coordination>,
291  operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>,
292  all_observables<Observable, ObservableN...>>,
293  class ResolvedSelector = rxu::decay_t<Selector>,
294  class combine_latest = rxo::detail::combine_latest<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
295  class Value = rxu::value_type_t<combine_latest>,
296  class Result = observable<Value, combine_latest>>
297  static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
298  {
299  return Result(combine_latest(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
300  }
301 
302  template<class... AN>
303  static operators::detail::combine_latest_invalid_t<AN...> member(const AN&...) {
304  std::terminate();
305  return {};
306  static_assert(sizeof...(AN) == 10000, "combine_latest takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
307  }
308 };
309 
310 }
311 
312 #endif
Definition: rx-util.hpp:100
static Result member(Observable &&o, ObservableN &&...on)
Definition: rx-combine_latest.hpp:258
Definition: rx-all.hpp:26
static operators::detail::combine_latest_invalid_t< AN... > member(const AN &...)
Definition: rx-combine_latest.hpp:303
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
auto pack() -> detail::pack
Definition: rx-util.hpp:273
static Result member(Observable &&o, Selector &&s, ObservableN &&...on)
Definition: rx-combine_latest.hpp:271
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
auto combine_latest(AN &&...an) -> operator_factory< combine_latest_tag, AN... >
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-combine_latest.hpp:242
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto surely(const std::tuple< T... > &tpl) -> decltype(apply(tpl, detail::surely()))
Definition: rx-util.hpp:678
Definition: rx-operators.hpp:150
static Result member(Observable &&o, Coordination &&cn, ObservableN &&...on)
Definition: rx-combine_latest.hpp:283
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static Result member(Observable &&o, Coordination &&cn, Selector &&s, ObservableN &&...on)
Definition: rx-combine_latest.hpp:297
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37