5 #if !defined(RXCPP_SOURCES_RX_INTERVAL_HPP) 6 #define RXCPP_SOURCES_RX_INTERVAL_HPP 8 #include "../rx-includes.hpp" 44 template<
class Coordination>
45 struct interval :
public source_base<long>
47 typedef interval<Coordination> this_type;
49 typedef rxu::decay_t<Coordination> coordination_type;
50 typedef typename coordination_type::coordinator_type coordinator_type;
52 struct interval_initial_type
54 interval_initial_type(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn)
57 , coordination(std::move(cn))
60 rxsc::scheduler::clock_type::time_point initial;
61 rxsc::scheduler::clock_type::duration period;
62 coordination_type coordination;
64 interval_initial_type initial;
66 interval(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn)
67 : initial(i, p, std::move(cn))
70 template<
class Subscriber>
71 void on_subscribe(Subscriber o)
const {
75 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
77 auto controller = coordinator.get_worker();
79 auto counter = std::make_shared<long>(0);
81 auto producer = [o, counter](
const rxsc::schedulable&) {
83 o.on_next(++(*counter));
87 [&](){
return coordinator.act(producer);},
89 if (selectedProducer.empty()) {
93 controller.schedule_periodically(initial.initial, initial.period, selectedProducer.get());
97 template<
class Duration,
class Coordination>
98 struct defer_interval :
public defer_observable<
100 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value,
101 is_coordination<Coordination>::value>,
103 interval, Coordination>
112 template<
class Duration>
114 ->
typename std::enable_if<
115 detail::defer_interval<Duration, identity_one_worker>::value,
116 typename detail::defer_interval<Duration, identity_one_worker>::observable_type>::type {
122 template<
class Coordination>
123 auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
124 ->
typename std::enable_if<
125 detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value,
126 typename detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type {
127 return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(cn.now(), period, std::move(cn));
132 template<
class Duration>
133 auto interval(rxsc::scheduler::clock_type::time_point when, Duration period)
134 ->
typename std::enable_if<
135 detail::defer_interval<Duration, identity_one_worker>::value,
136 typename detail::defer_interval<Duration, identity_one_worker>::observable_type>::type {
142 template<
class Coordination>
143 auto interval(rxsc::scheduler::clock_type::time_point when, rxsc::scheduler::clock_type::duration period, Coordination cn)
144 ->
typename std::enable_if<
145 detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value,
146 typename detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type {
147 return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(when, period, std::move(cn));
Definition: rx-all.hpp:26
static const bool value
Definition: rx-predef.hpp:123
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175