38 #if !defined(RXCPP_OPERATORS_RX_WITH_LATEST_FROM_HPP) 39 #define RXCPP_OPERATORS_RX_WITH_LATEST_FROM_HPP 41 #include "../rx-includes.hpp" 50 struct with_latest_from_invalid_arguments {};
53 struct with_latest_from_invalid :
public rxo::operator_base<with_latest_from_invalid_arguments<AN...>> {
54 using type = observable<with_latest_from_invalid_arguments<
AN...>, with_latest_from_invalid<
AN...>>;
57 using with_latest_from_invalid_t =
typename with_latest_from_invalid<
AN...>::type;
59 template<
class Selector,
class... ObservableN>
60 struct is_with_latest_from_selector_check {
61 typedef rxu::decay_t<Selector> selector_type;
64 template<
class CS,
class... CON>
65 static auto check(
int) -> decltype((*(CS*)
nullptr)((*(
typename CON::value_type*)
nullptr)...));
66 template<
class CS,
class... CON>
67 static tag_not_valid check(...);
69 using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
71 static const bool value = !std::is_same<type, tag_not_valid>::value;
74 template<
class Selector,
class... ObservableN>
75 struct invalid_with_latest_from_selector {
76 static const bool value =
false;
79 template<
class Selector,
class... ObservableN>
80 struct is_with_latest_from_selector :
public std::conditional<
81 is_with_latest_from_selector_check<Selector, ObservableN...>::value,
82 is_with_latest_from_selector_check<Selector, ObservableN...>,
83 invalid_with_latest_from_selector<Selector, ObservableN...>>::type {
86 template<
class Selector,
class... ON>
87 using result_with_latest_from_selector_t =
typename is_with_latest_from_selector<Selector, ON...>::type;
89 template<
class Coordination,
class Selector,
class... ObservableN>
90 struct with_latest_from_traits {
92 typedef std::tuple<ObservableN...> tuple_source_type;
93 typedef std::tuple<rxu::detail::maybe<typename ObservableN::value_type>...> tuple_source_value_type;
95 typedef rxu::decay_t<Selector> selector_type;
96 typedef rxu::decay_t<Coordination> coordination_type;
98 typedef typename is_with_latest_from_selector<selector_type, ObservableN...>::type value_type;
101 template<
class Coordination,
class Selector,
class... ObservableN>
102 struct with_latest_from :
public operator_base<rxu::value_type_t<with_latest_from_traits<Coordination, Selector, ObservableN...>>>
106 typedef with_latest_from_traits<Coordination, Selector, ObservableN...> traits;
108 typedef typename traits::tuple_source_type tuple_source_type;
109 typedef typename traits::tuple_source_value_type tuple_source_value_type;
111 typedef typename traits::selector_type selector_type;
113 typedef typename traits::coordination_type coordination_type;
114 typedef typename coordination_type::coordinator_type coordinator_type;
118 values(tuple_source_type o, selector_type s, coordination_type sf)
119 : source(std::move(o))
120 , selector(std::move(s))
121 , coordination(std::move(sf))
124 tuple_source_type source;
125 selector_type selector;
126 coordination_type coordination;
130 with_latest_from(coordination_type sf, selector_type s, tuple_source_type ts)
131 : initial(std::move(ts), std::move(s), std::move(sf))
135 template<
int Index,
class State>
136 void subscribe_one(std::shared_ptr<State> state)
const {
138 typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
140 composite_subscription innercs;
144 state->out.add(innercs);
147 [&](){
return state->coordinator.in(std::get<Index>(state->source));},
149 if (source.empty()) {
156 auto sink = make_subscriber<source_value_type>(
160 [state](source_value_type st) {
161 auto& value = std::get<Index>(state->latest);
169 if (state->valuesSet ==
sizeof... (ObservableN) && Index == 0) {
171 auto selectedResult = rxu::apply(values, state->selector);
172 state->out.on_next(selectedResult);
176 [state](std::exception_ptr e) {
177 state->out.on_error(e);
181 if (--state->pendingCompletions == 0) {
182 state->out.on_completed();
187 [&](){
return state->coordinator.out(sink);},
189 if (selectedSink.empty()) {
192 source->subscribe(std::move(selectedSink.get()));
194 template<
class State,
int... IndexN>
195 void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>)
const {
196 bool subscribed[] = {(subscribe_one<(
sizeof...(IndexN)) - 1 - IndexN>(state),
true)...};
197 subscribed[0] = (*subscribed);
200 template<
class Subscriber>
201 void on_subscribe(Subscriber scbr)
const {
204 typedef Subscriber output_type;
206 struct with_latest_from_state_type
207 :
public std::enable_shared_from_this<with_latest_from_state_type>
210 with_latest_from_state_type(values i, coordinator_type coor, output_type oarg)
211 : values(std::move(i))
212 , pendingCompletions(
sizeof... (ObservableN))
214 , coordinator(std::move(coor))
215 , out(std::move(oarg))
221 mutable int pendingCompletions;
222 mutable int valuesSet;
223 mutable tuple_source_value_type latest;
224 coordinator_type coordinator;
228 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
231 auto state = std::make_shared<with_latest_from_state_type>(initial, std::move(coordinator), std::move(scbr));
233 subscribe_all(state,
typename rxu::values_from<
int,
sizeof...(ObservableN)>::type());
241 template<
class...
AN>
244 return operator_factory<with_latest_from_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
252 template<
class Observable,
class... ObservableN,
258 static Result
member(Observable&& o, ObservableN&&... on)
263 template<
class Observable,
class Selector,
class... ObservableN,
265 operators::detail::is_with_latest_from_selector<Selector, Observable, ObservableN...>,
271 static Result
member(Observable&& o, Selector&& s, ObservableN&&... on)
276 template<
class Coordination,
class Observable,
class... ObservableN,
280 class with_latest_from = rxo::detail::with_latest_from<Coordination, rxu::detail::pack, rxu::decay_t<Observable>,
rxu::decay_t<ObservableN>...>,
283 static Result
member(Observable&& o, Coordination&& cn, ObservableN&&... on)
285 return Result(
with_latest_from(std::forward<Coordination>(cn),
rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
288 template<
class Coordination,
class Selector,
class Observable,
class... ObservableN,
290 is_coordination<Coordination>,
291 operators::detail::is_with_latest_from_selector<Selector, Observable, ObservableN...>,
294 class with_latest_from = rxo::detail::with_latest_from<Coordination, ResolvedSelector, rxu::decay_t<Observable>,
rxu::decay_t<ObservableN>...>,
297 static Result
member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
299 return Result(
with_latest_from(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
302 template<
class...
AN>
303 static operators::detail::with_latest_from_invalid_t<
AN...>
member(
const AN&...) {
306 static_assert(
sizeof...(
AN) == 10000,
"with_latest_from takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
auto with_latest_from(AN &&...an) -> operator_factory< with_latest_from_tag, AN... >
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-with_latest_from.hpp:242
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
auto pack() -> detail::pack
Definition: rx-util.hpp:273
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
static Result member(Observable &&o, ObservableN &&...on)
Definition: rx-with_latest_from.hpp:258
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto surely(const std::tuple< T... > &tpl) -> decltype(apply(tpl, detail::surely()))
Definition: rx-util.hpp:678
static Result member(Observable &&o, Coordination &&cn, Selector &&s, ObservableN &&...on)
Definition: rx-with_latest_from.hpp:297
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static operators::detail::with_latest_from_invalid_t< AN... > member(const AN &...)
Definition: rx-with_latest_from.hpp:303
static Result member(Observable &&o, Selector &&s, ObservableN &&...on)
Definition: rx-with_latest_from.hpp:271
static Result member(Observable &&o, Coordination &&cn, ObservableN &&...on)
Definition: rx-with_latest_from.hpp:283
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37
Definition: rx-operators.hpp:501