5 #if !defined(RXCPP_SOURCES_RX_RANGE_HPP) 6 #define RXCPP_SOURCES_RX_RANGE_HPP 8 #include "../rx-includes.hpp" 39 template<
class T,
class Coordination>
40 struct range :
public source_base<T>
42 typedef rxu::decay_t<Coordination> coordination_type;
43 typedef typename coordination_type::coordinator_type coordinator_type;
45 struct range_state_type
47 range_state_type(T f, T l, std::ptrdiff_t s, coordination_type cn)
51 , coordination(std::move(cn))
57 coordination_type coordination;
59 range_state_type initial;
60 range(T f, T l, std::ptrdiff_t s, coordination_type cn)
61 : initial(f, l, s, std::move(cn))
64 template<
class Subscriber>
65 void on_subscribe(Subscriber o)
const {
69 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
71 auto controller = coordinator.get_worker();
75 auto producer = [=](
const rxsc::schedulable&
self){
77 if (!dest.is_subscribed()) {
83 dest.on_next(state.next);
84 if (!dest.is_subscribed()) {
89 if (std::abs(state.last - state.next) < std::abs(state.step)) {
90 if (state.last != state.next) {
91 dest.on_next(state.last);
97 state.next =
static_cast<T
>(state.step + state.next);
104 [&](){
return coordinator.act(producer);},
106 if (selectedProducer.empty()) {
110 controller.schedule(selectedProducer.get());
126 template<
class T,
class Coordination>
130 detail::range<T, Coordination>(
first,
last, step, std::move(cn)));
134 template<
class T,
class Coordination>
136 ->
typename std::enable_if<is_coordination<Coordination>::value,
139 detail::range<T, Coordination>(
first,
last, 1, std::move(cn)));
143 template<
class T,
class Coordination>
145 ->
typename std::enable_if<is_coordination<Coordination>::value,
Definition: rx-all.hpp:26
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
static const bool value
Definition: rx-predef.hpp:123
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> observable< T, detail::range< T, identity_one_worker >>
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-range.hpp:119