5 #if !defined(RXCPP_SOURCES_RX_TIMER_HPP) 6 #define RXCPP_SOURCES_RX_TIMER_HPP 8 #include "../rx-includes.hpp" 44 template<
class Coordination>
45 struct timer :
public source_base<long>
47 typedef timer<Coordination> this_type;
49 typedef rxu::decay_t<Coordination> coordination_type;
50 typedef typename coordination_type::coordinator_type coordinator_type;
52 struct timer_initial_type
54 timer_initial_type(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
56 , coordination(std::move(cn))
59 rxsc::scheduler::clock_type::time_point when;
60 coordination_type coordination;
62 timer_initial_type initial;
64 timer(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
65 : initial(t, std::move(cn))
68 timer(rxsc::scheduler::clock_type::duration p, coordination_type cn)
69 : initial(rxsc::scheduler::clock_type::time_point(), std::move(cn))
71 initial.when = initial.coordination.now() + p;
73 template<
class Subscriber>
74 void on_subscribe(Subscriber o)
const {
78 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
79 auto controller = coordinator.get_worker();
81 auto producer = [o](
const rxsc::schedulable&) {
88 [&](){
return coordinator.act(producer);},
90 if (selectedProducer.empty()) {
94 controller.schedule(initial.when, selectedProducer.get());
98 template<
class TimePo
intOrDuration,
class Coordination>
99 struct defer_timer :
public defer_observable<
101 std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value ||
102 std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::duration>::value,
103 is_coordination<Coordination>::value>,
113 template<
class TimePo
intOrDuration>
114 auto timer(TimePointOrDuration when)
115 ->
typename std::enable_if<
116 detail::defer_timer<TimePointOrDuration, identity_one_worker>::value,
117 typename detail::defer_timer<TimePointOrDuration, identity_one_worker>::observable_type>::type {
123 template<
class TimePo
intOrDuration,
class Coordination>
124 auto timer(TimePointOrDuration when, Coordination cn)
125 ->
typename std::enable_if<
126 detail::defer_timer<TimePointOrDuration, Coordination>::value,
127 typename detail::defer_timer<TimePointOrDuration, Coordination>::observable_type>::type {
128 return detail::defer_timer<TimePointOrDuration, Coordination>::make(when, std::move(cn));
Definition: rx-all.hpp:26
static const bool value
Definition: rx-predef.hpp:123
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175