5 #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP) 6 #define RXCPP_OPERATORS_RX_ZIP_HPP 8 #include "../rx-includes.hpp" 49 template<
class Observable>
50 struct zip_source_state
52 using value_type = rxu::value_type_t<Observable>;
57 std::list<value_type> values;
61 struct values_not_empty {
62 template<
class Observable>
63 bool operator()(zip_source_state<Observable>& source)
const {
64 return !source.values.empty();
68 struct source_completed_values_empty {
69 template<
class Observable>
70 bool operator()(zip_source_state<Observable>& source)
const {
71 return source.completed && source.values.empty();
75 struct extract_value_front {
76 template<
class Observable,
class Value = rxu::value_type_t<Observable>>
77 Value operator()(zip_source_state<Observable>& source)
const {
78 auto val = std::move(source.values.front());
79 source.values.pop_front();
85 struct zip_invalid_arguments {};
88 struct zip_invalid :
public rxo::operator_base<zip_invalid_arguments<AN...>> {
89 using type = observable<zip_invalid_arguments<
AN...>, zip_invalid<
AN...>>;
92 using zip_invalid_t =
typename zip_invalid<
AN...>::type;
94 template<
class Selector,
class... ObservableN>
95 struct is_zip_selector_check {
96 typedef rxu::decay_t<Selector> selector_type;
99 template<
class CS,
class... CON>
100 static auto check(
int) -> decltype((*(CS*)
nullptr)((*(
typename CON::value_type*)
nullptr)...));
101 template<
class CS,
class... CON>
102 static tag_not_valid check(...);
104 using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
106 static const bool value = !std::is_same<type, tag_not_valid>::value;
109 template<
class Selector,
class... ObservableN>
110 struct invalid_zip_selector {
111 static const bool value =
false;
114 template<
class Selector,
class... ObservableN>
115 struct is_zip_selector :
public std::conditional<
116 is_zip_selector_check<Selector, ObservableN...>::value,
117 is_zip_selector_check<Selector, ObservableN...>,
118 invalid_zip_selector<Selector, ObservableN...>>::type {
121 template<
class Selector,
class... ON>
122 using result_zip_selector_t =
typename is_zip_selector<Selector, ON...>::type;
124 template<
class Coordination,
class Selector,
class... ObservableN>
126 typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type;
127 typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type;
129 typedef rxu::decay_t<Selector> selector_type;
130 typedef rxu::decay_t<Coordination> coordination_type;
132 typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type;
135 template<
class Coordination,
class Selector,
class... ObservableN>
136 struct zip :
public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>>
138 typedef zip<Coordination, Selector, ObservableN...> this_type;
140 typedef zip_traits<Coordination, Selector, ObservableN...> traits;
142 typedef typename traits::tuple_source_type tuple_source_type;
143 typedef typename traits::tuple_source_values_type tuple_source_values_type;
145 typedef typename traits::selector_type selector_type;
147 typedef typename traits::coordination_type coordination_type;
148 typedef typename coordination_type::coordinator_type coordinator_type;
152 values(tuple_source_type o, selector_type s, coordination_type sf)
153 : source(std::move(o))
154 , selector(std::move(s))
155 , coordination(std::move(sf))
158 tuple_source_type source;
159 selector_type selector;
160 coordination_type coordination;
164 zip(coordination_type sf, selector_type s, tuple_source_type ts)
165 : initial(std::move(ts), std::move(s), std::move(sf))
169 template<
int Index,
class State>
170 void subscribe_one(std::shared_ptr<State> state)
const {
172 typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
174 composite_subscription innercs;
178 state->out.add(innercs);
181 [&](){
return state->coordinator.in(std::get<Index>(state->source));},
183 if (source.empty()) {
190 auto sink = make_subscriber<source_value_type>(
194 [state](source_value_type st) {
195 auto& values = std::get<Index>(state->pending).values;
196 values.push_back(st);
197 if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) {
198 auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector);
199 state->out.on_next(selectedResult);
201 if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) {
202 state->out.on_completed();
206 [state](std::exception_ptr e) {
207 state->out.on_error(e);
211 auto& completed = std::get<Index>(state->pending).completed;
213 if (--state->pendingCompletions == 0) {
214 state->out.on_completed();
219 [&](){
return state->coordinator.out(sink);},
221 if (selectedSink.empty()) {
224 source->subscribe(std::move(selectedSink.get()));
226 template<
class State,
int... IndexN>
227 void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>)
const {
228 bool subscribed[] = {(subscribe_one<IndexN>(state),
true)...};
229 subscribed[0] = (*subscribed);
232 template<
class Subscriber>
233 void on_subscribe(Subscriber scbr)
const {
236 typedef Subscriber output_type;
238 struct zip_state_type
239 :
public std::enable_shared_from_this<zip_state_type>
242 zip_state_type(values i, coordinator_type coor, output_type oarg)
243 : values(std::move(i))
244 , pendingCompletions(
sizeof... (ObservableN))
246 , coordinator(std::move(coor))
247 , out(std::move(oarg))
253 mutable int pendingCompletions;
254 mutable int valuesSet;
255 mutable tuple_source_values_type pending;
256 coordinator_type coordinator;
260 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
263 auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr));
265 subscribe_all(state,
typename rxu::values_from<
int,
sizeof...(ObservableN)>::type());
273 template<
class...
AN>
284 template<
class Observable,
class... ObservableN,
287 class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>,
rxu::decay_t<ObservableN>...>,
290 static Result
member(Observable&& o, ObservableN&&... on)
295 template<
class Observable,
class Selector,
class... ObservableN,
297 operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
300 class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>,
rxu::decay_t<ObservableN>...>,
303 static Result
member(Observable&& o, Selector&& s, ObservableN&&... on)
305 return Result(Zip(
identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
308 template<
class Coordination,
class Observable,
class... ObservableN,
315 static Result
member(Observable&& o, Coordination&& cn, ObservableN&&... on)
317 return Result(Zip(std::forward<Coordination>(cn),
rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
320 template<
class Coordination,
class Selector,
class Observable,
class... ObservableN,
322 is_coordination<Coordination>,
323 operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
329 static Result
member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
331 return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
334 template<
class...
AN>
335 static operators::detail::zip_invalid_t<
AN...>
member(
const AN&...) {
338 static_assert(
sizeof...(
AN) == 10000,
"zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
auto zip(AN &&...an) -> operator_factory< zip_tag, AN... >
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-zip.hpp:274
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:508
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
auto pack() -> detail::pack
Definition: rx-util.hpp:273
Definition: rx-operators.hpp:47
static Result member(Observable &&o, Coordination &&cn, Selector &&s, ObservableN &&...on)
Definition: rx-zip.hpp:329
static const bool value
Definition: rx-predef.hpp:123
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, Selector &&s, ObservableN &&...on)
Definition: rx-zip.hpp:303
static Result member(Observable &&o, Coordination &&cn, ObservableN &&...on)
Definition: rx-zip.hpp:315
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static Result member(Observable &&o, ObservableN &&...on)
Definition: rx-zip.hpp:290
static operators::detail::zip_invalid_t< AN... > member(const AN &...)
Definition: rx-zip.hpp:335
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37