22 #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP) 23 #define RXCPP_OPERATORS_RX_TIMEOUT_HPP 25 #include "../rx-includes.hpp" 33 std::runtime_error(msg)
42 struct timeout_invalid_arguments {};
46 using type =
observable<timeout_invalid_arguments<
AN...>, timeout_invalid<
AN...>>;
49 using timeout_invalid_t =
typename timeout_invalid<
AN...>::type;
51 template<
class T,
class Duration,
class Coordination>
56 typedef typename coordination_type::coordinator_type coordinator_type;
61 timeout_values(duration_type p, coordination_type c)
68 coordination_type coordination;
70 timeout_values initial;
72 timeout(duration_type period, coordination_type coordination)
73 : initial(period, coordination)
77 template<
class Subscriber>
78 struct timeout_observer
80 typedef timeout_observer<Subscriber> this_type;
85 struct timeout_subscriber_values :
public timeout_values
101 mutable std::size_t index;
103 typedef std::shared_ptr<timeout_subscriber_values> state_type;
107 : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
109 auto localState = state;
112 localState->cs.unsubscribe();
113 localState->dest.unsubscribe();
114 localState->worker.unsubscribe();
117 [&](){
return localState->coordinator.act(disposer); },
119 if (selectedDisposer.empty()) {
123 localState->dest.add([=](){
124 localState->worker.schedule(selectedDisposer.get());
126 localState->cs.add([=](){
127 localState->worker.schedule(selectedDisposer.get());
131 static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t
id, state_type state) {
133 if(
id != state->index)
140 [&](){
return state->coordinator.act(produce); },
142 if (selectedProduce.empty()) {
143 return std::function<void(const rxsc::schedulable&)>();
146 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
149 void on_next(T v)
const {
150 auto localState = state;
152 auto new_id = ++localState->index;
153 auto produce_time = localState->worker.now() + localState->period;
155 localState->dest.on_next(v);
156 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
159 [&](){
return localState->coordinator.act(work);},
161 if (selectedWork.empty()) {
164 localState->worker.schedule(selectedWork.get());
167 void on_error(std::exception_ptr e)
const {
168 auto localState = state;
170 localState->dest.on_error(e);
173 [&](){
return localState->coordinator.act(work); },
175 if (selectedWork.empty()) {
178 localState->worker.schedule(selectedWork.get());
181 void on_completed()
const {
182 auto localState = state;
184 localState->dest.on_completed();
187 [&](){
return localState->coordinator.act(work); },
189 if (selectedWork.empty()) {
192 localState->worker.schedule(selectedWork.get());
197 auto coordinator = v.coordination.create_coordinator();
199 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(
coordinator))));
203 template<
class Subscriber>
204 auto operator()(Subscriber dest)
const 205 -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
206 return timeout_observer<Subscriber>::make(std::move(dest), initial);
214 template<
class...
AN>
217 return operator_factory<timeout_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
225 template<
class Observable,
class Duration,
230 class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>>
231 static auto member(Observable&& o, Duration&& d)
236 template<
class Observable,
class Coordination,
class Duration,
238 is_observable<Observable>,
240 rxu::is_duration<Duration>>,
243 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
244 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
245 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
248 template<
class Observable,
class Coordination,
class Duration,
250 is_observable<Observable>,
251 is_coordination<Coordination>,
252 rxu::is_duration<Duration>>,
255 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
256 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
257 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
260 template<
class...
AN>
261 static operators::detail::timeout_invalid_t<
AN...>
member(
const AN&...) {
264 static_assert(
sizeof...(
AN) == 10000,
"timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
Definition: rx-util.hpp:791
timeout_error(const std::string &msg)
Definition: rx-timeout.hpp:32
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:243
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-timeout.hpp:29
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:255
Definition: rx-operators.hpp:452
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
Definition: rx-operators.hpp:16
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto timeout(AN &&...an) -> operator_factory< timeout_tag, AN... >
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-timeout.hpp:215
static operators::detail::timeout_invalid_t< AN... > member(const AN &...)
Definition: rx-timeout.hpp:261
Definition: rx-coordination.hpp:114
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-scheduler.hpp:426
rxsc::worker get_worker() const
Definition: rx-coordination.hpp:85
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-timeout.hpp:231
Definition: rx-coordination.hpp:45