RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-range.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_RANGE_HPP)
6 #define RXCPP_SOURCES_RX_RANGE_HPP
7 
8 #include "../rx-includes.hpp"
9 
33 namespace rxcpp {
34 
35 namespace sources {
36 
37 namespace detail {
38 
39 template<class T, class Coordination>
40 struct range : public source_base<T>
41 {
42  typedef rxu::decay_t<Coordination> coordination_type;
43  typedef typename coordination_type::coordinator_type coordinator_type;
44 
45  struct range_state_type
46  {
47  range_state_type(T f, T l, std::ptrdiff_t s, coordination_type cn)
48  : next(f)
49  , last(l)
50  , step(s)
51  , coordination(std::move(cn))
52  {
53  }
54  mutable T next;
55  T last;
56  std::ptrdiff_t step;
57  coordination_type coordination;
58  };
59  range_state_type initial;
60  range(T f, T l, std::ptrdiff_t s, coordination_type cn)
61  : initial(f, l, s, std::move(cn))
62  {
63  }
64  template<class Subscriber>
65  void on_subscribe(Subscriber o) const {
66  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
67 
68  // creates a worker whose lifetime is the same as this subscription
69  auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
70 
71  auto controller = coordinator.get_worker();
72 
73  auto state = initial;
74 
75  auto producer = [=](const rxsc::schedulable& self){
76  auto& dest = o;
77  if (!dest.is_subscribed()) {
78  // terminate loop
79  return;
80  }
81 
82  // send next value
83  dest.on_next(state.next);
84  if (!dest.is_subscribed()) {
85  // terminate loop
86  return;
87  }
88 
89  if (std::abs(state.last - state.next) < std::abs(state.step)) {
90  if (state.last != state.next) {
91  dest.on_next(state.last);
92  }
93  dest.on_completed();
94  // o is unsubscribed
95  return;
96  }
97  state.next = static_cast<T>(state.step + state.next);
98 
99  // tail recurse this same action to continue loop
100  self();
101  };
102 
103  auto selectedProducer = on_exception(
104  [&](){return coordinator.act(producer);},
105  o);
106  if (selectedProducer.empty()) {
107  return;
108  }
109 
110  controller.schedule(selectedProducer.get());
111  }
112 };
113 
114 }
115 
118 template<class T>
119 auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
122  detail::range<T, identity_one_worker>(first, last, step, identity_current_thread()));
123 }
126 template<class T, class Coordination>
127 auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
130  detail::range<T, Coordination>(first, last, step, std::move(cn)));
131 }
134 template<class T, class Coordination>
135 auto range(T first, T last, Coordination cn)
136  -> typename std::enable_if<is_coordination<Coordination>::value,
139  detail::range<T, Coordination>(first, last, 1, std::move(cn)));
140 }
143 template<class T, class Coordination>
144 auto range(T first, Coordination cn)
145  -> typename std::enable_if<is_coordination<Coordination>::value,
148  detail::range<T, Coordination>(first, std::numeric_limits<T>::max(), 1, std::move(cn)));
149 }
150 
151 }
152 
153 }
154 
155 #endif
Definition: rx-all.hpp:26
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
static const bool value
Definition: rx-predef.hpp:123
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> observable< T, detail::range< T, identity_one_worker >>
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-range.hpp:119