20 #if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP) 21 #define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP 23 #include "../rx-includes.hpp" 32 struct switch_on_next_invalid_arguments {};
35 struct switch_on_next_invalid :
public rxo::operator_base<switch_on_next_invalid_arguments<AN...>> {
36 using type = observable<switch_on_next_invalid_arguments<
AN...>, switch_on_next_invalid<
AN...>>;
39 using switch_on_next_invalid_t =
typename switch_on_next_invalid<
AN...>::type;
41 template<
class T,
class Observable,
class Coordination>
43 :
public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
48 typedef switch_on_next<T, Observable, Coordination> this_type;
50 typedef rxu::decay_t<T> source_value_type;
51 typedef rxu::decay_t<Observable> source_type;
53 typedef typename source_type::source_operator_type source_operator_type;
55 typedef source_value_type collection_type;
56 typedef typename collection_type::value_type collection_value_type;
58 typedef rxu::decay_t<Coordination> coordination_type;
59 typedef typename coordination_type::coordinator_type coordinator_type;
63 values(source_operator_type o, coordination_type sf)
64 : source_operator(std::move(o))
65 , coordination(std::move(sf))
68 source_operator_type source_operator;
69 coordination_type coordination;
74 : initial(o.source_operator, std::move(sf))
78 template<
class Subscriber>
79 void on_subscribe(Subscriber scbr)
const {
82 typedef Subscriber output_type;
84 struct switch_state_type
85 :
public std::enable_shared_from_this<switch_state_type>
88 switch_state_type(values i, coordinator_type coor, output_type oarg)
90 , source(i.source_operator)
91 , pendingCompletions(0)
92 , coordinator(std::move(coor))
93 , out(std::move(oarg))
96 observable<source_value_type, source_operator_type> source;
99 int pendingCompletions;
100 coordinator_type coordinator;
101 composite_subscription inner_lifetime;
105 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
108 auto state = std::make_shared<switch_state_type>(initial, std::move(coordinator), std::move(scbr));
110 composite_subscription outercs;
114 state->out.add(outercs);
117 [&](){
return state->coordinator.in(state->source);},
119 if (source.empty()) {
123 ++state->pendingCompletions;
127 auto sink = make_subscriber<collection_type>(
131 [state](collection_type st) {
133 state->inner_lifetime.unsubscribe();
135 state->inner_lifetime = composite_subscription();
139 auto innerlifetimetoken = state->out.add(state->inner_lifetime);
142 state->out.remove(innerlifetimetoken);
143 --state->pendingCompletions;
146 auto selectedSource = state->coordinator.in(st);
150 auto sinkInner = make_subscriber<collection_value_type>(
152 state->inner_lifetime,
154 [state, st](collection_value_type ct) {
155 state->out.on_next(std::move(ct));
158 [state](std::exception_ptr e) {
159 state->out.on_error(e);
163 if (state->pendingCompletions == 1) {
164 state->out.on_completed();
169 auto selectedSinkInner = state->coordinator.out(sinkInner);
170 ++state->pendingCompletions;
171 selectedSource.subscribe(std::move(selectedSinkInner));
174 [state](std::exception_ptr e) {
175 state->out.on_error(e);
179 if (--state->pendingCompletions == 0) {
180 state->out.on_completed();
186 [&](){
return state->coordinator.out(sink);},
188 if (selectedSink.empty()) {
192 source->subscribe(std::move(selectedSink.get()));
201 template<
class...
AN>
204 return operator_factory<switch_on_next_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
212 template<
class Observable,
216 class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>,
identity_one_worker>,
224 template<
class Observable,
class Coordination,
226 is_observable<Observable>,
233 static Result
member(Observable&& o, Coordination&& cn) {
234 return Result(SwitchOnNext(std::forward<Observable>(o), std::forward<Coordination>(cn)));
237 template<
class...
AN>
238 static operators::detail::switch_on_next_invalid_t<
AN...>
member(
AN...) {
241 static_assert(
sizeof...(
AN) == 10000,
"switch_on_next takes (optional Coordination)");
static operators::detail::switch_on_next_invalid_t< AN... > member(AN...)
Definition: rx-switch_on_next.hpp:238
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-switch_on_next.hpp:233
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto switch_on_next(AN &&...an) -> operator_factory< switch_on_next_tag, AN... >
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-switch_on_next.hpp:202
static Result member(Observable &&o)
Definition: rx-switch_on_next.hpp:220
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
Definition: rx-operators.hpp:410
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37