23 #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP) 24 #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP 26 #include "../rx-includes.hpp" 35 struct subscribe_on_invalid_arguments {};
38 struct subscribe_on_invalid :
public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> {
39 using type = observable<subscribe_on_invalid_arguments<
AN...>, subscribe_on_invalid<
AN...>>;
42 using subscribe_on_invalid_t =
typename subscribe_on_invalid<
AN...>::type;
44 template<
class T,
class Observable,
class Coordination>
47 typedef rxu::decay_t<Observable> source_type;
48 typedef rxu::decay_t<Coordination> coordination_type;
49 typedef typename coordination_type::coordinator_type coordinator_type;
50 struct subscribe_on_values
52 ~subscribe_on_values()
55 subscribe_on_values(source_type s, coordination_type sf)
56 : source(std::move(s))
57 , coordination(std::move(sf))
61 coordination_type coordination;
63 subscribe_on_values& operator=(subscribe_on_values o) RXCPP_DELETE;
65 const subscribe_on_values initial;
71 : initial(std::move(s), std::move(sf))
75 template<
class Subscriber>
76 void on_subscribe(Subscriber s)
const {
78 typedef Subscriber output_type;
79 struct subscribe_on_state_type
80 :
public std::enable_shared_from_this<subscribe_on_state_type>
81 ,
public subscribe_on_values
83 subscribe_on_state_type(
const subscribe_on_values& i,
const output_type& oarg)
84 : subscribe_on_values(i)
88 composite_subscription source_lifetime;
91 subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
94 composite_subscription coordinator_lifetime;
96 auto coordinator = initial.coordination.create_coordinator(coordinator_lifetime);
98 auto controller = coordinator.get_worker();
101 auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s));
103 auto sl = state->source_lifetime;
104 auto ol = state->out.get_subscription();
106 auto disposer = [=](
const rxsc::schedulable&){
109 coordinator_lifetime.unsubscribe();
112 [&](){
return coordinator.act(disposer);},
114 if (selectedDisposer.empty()) {
118 state->source_lifetime.add([=](){
119 controller.schedule(selectedDisposer.get());
122 state->out.add([=](){
125 coordinator_lifetime.unsubscribe();
128 auto producer = [=](
const rxsc::schedulable&){
129 state->source.subscribe(state->source_lifetime, state->out);
133 [&](){
return coordinator.act(producer);},
135 if (selectedProducer.empty()) {
139 controller.schedule(selectedProducer.get());
149 template<
class...
AN>
152 return operator_factory<subscribe_on_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
160 template<
class Observable,
class Coordination,
168 static Result
member(Observable&& o, Coordination&& cn) {
169 return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn)));
172 template<
class...
AN>
173 static operators::detail::subscribe_on_invalid_t<
AN...>
member(
AN...) {
176 static_assert(
sizeof...(
AN) == 10000,
"subscribe_on takes (Coordination)");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
auto subscribe_on(AN &&...an) -> operator_factory< subscribe_on_tag, AN... >
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-subscribe_on.hpp:150
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-subscribe_on.hpp:168
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static operators::detail::subscribe_on_invalid_t< AN... > member(AN...)
Definition: rx-subscribe_on.hpp:173
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-operators.hpp:395