22 #if !defined(RXCPP_OPERATORS_RX_DELAY_HPP) 23 #define RXCPP_OPERATORS_RX_DELAY_HPP 25 #include "../rx-includes.hpp" 34 struct delay_invalid_arguments {};
37 struct delay_invalid :
public rxo::operator_base<delay_invalid_arguments<AN...>> {
38 using type = observable<delay_invalid_arguments<
AN...>, delay_invalid<
AN...>>;
41 using delay_invalid_t =
typename delay_invalid<
AN...>::type;
43 template<
class T,
class Duration,
class Coordination>
46 typedef rxu::decay_t<T> source_value_type;
47 typedef rxu::decay_t<Coordination> coordination_type;
48 typedef typename coordination_type::coordinator_type coordinator_type;
49 typedef rxu::decay_t<Duration> duration_type;
53 delay_values(duration_type p, coordination_type c)
59 coordination_type coordination;
63 delay(duration_type period, coordination_type coordination)
64 : initial(period, coordination)
68 template<
class Subscriber>
71 typedef delay_observer<Subscriber> this_type;
72 typedef rxu::decay_t<T> value_type;
73 typedef rxu::decay_t<Subscriber> dest_type;
74 typedef observer<T, this_type> observer_type;
76 struct delay_subscriber_values :
public delay_values
78 delay_subscriber_values(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
82 , coordinator(std::move(c))
83 , worker(coordinator.get_worker())
84 , expected(worker.now())
87 composite_subscription cs;
89 coordinator_type coordinator;
91 rxsc::scheduler::clock_type::time_point expected;
93 std::shared_ptr<delay_subscriber_values> state;
95 delay_observer(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
96 : state(std::make_shared<delay_subscriber_values>(delay_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
98 auto localState = state;
100 auto disposer = [=](
const rxsc::schedulable&){
101 localState->cs.unsubscribe();
102 localState->dest.unsubscribe();
103 localState->worker.unsubscribe();
106 [&](){
return localState->coordinator.act(disposer);},
108 if (selectedDisposer.empty()) {
112 localState->dest.add([=](){
113 localState->worker.schedule(selectedDisposer.get());
115 localState->cs.add([=](){
116 localState->worker.schedule(localState->worker.now() + localState->period, selectedDisposer.get());
120 void on_next(T v)
const {
121 auto localState = state;
122 auto work = [v, localState](
const rxsc::schedulable&){
123 localState->dest.on_next(v);
126 [&](){
return localState->coordinator.act(work);},
128 if (selectedWork.empty()) {
131 localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
134 void on_error(std::exception_ptr e)
const {
135 auto localState = state;
136 auto work = [e, localState](
const rxsc::schedulable&){
137 localState->dest.on_error(e);
140 [&](){
return localState->coordinator.act(work);},
142 if (selectedWork.empty()) {
145 localState->worker.schedule(selectedWork.get());
148 void on_completed()
const {
149 auto localState = state;
150 auto work = [localState](
const rxsc::schedulable&){
151 localState->dest.on_completed();
154 [&](){
return localState->coordinator.act(work);},
156 if (selectedWork.empty()) {
159 localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
162 static subscriber<T, observer_type> make(dest_type d, delay_values v) {
163 auto cs = composite_subscription();
164 auto coordinator = v.coordination.create_coordinator();
166 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
170 template<
class Subscriber>
171 auto operator()(Subscriber dest)
const 172 -> decltype(delay_observer<Subscriber>::make(std::move(dest), initial)) {
173 return delay_observer<Subscriber>::make(std::move(dest), initial);
181 template<
class...
AN>
192 template<
class Observable,
class Duration,
198 static auto member(Observable&& o, Duration&& d)
203 template<
class Observable,
class Coordination,
class Duration,
205 is_observable<Observable>,
207 rxu::is_duration<Duration>>,
210 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
211 -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
212 return o.template lift<SourceValue>(
delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
215 template<
class Observable,
class Coordination,
class Duration,
217 is_observable<Observable>,
218 is_coordination<Coordination>,
219 rxu::is_duration<Duration>>,
222 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
223 -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
224 return o.template lift<SourceValue>(
delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
227 template<
class...
AN>
228 static operators::detail::delay_invalid_t<
AN...>
member(
const AN&...) {
231 static_assert(
sizeof...(
AN) == 10000,
"delay takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
Definition: rx-util.hpp:791
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(delay(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-delay.hpp:210
Definition: rx-operators.hpp:69
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(delay(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-delay.hpp:198
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(delay(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-delay.hpp:222
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto delay(AN &&...an) -> operator_factory< delay_tag, AN... >
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-delay.hpp:182
Definition: rx-operators.hpp:185
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static operators::detail::delay_invalid_t< AN... > member(const AN &...)
Definition: rx-delay.hpp:228
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37