RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-iterate.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_ITERATE_HPP)
6 #define RXCPP_SOURCES_RX_ITERATE_HPP
7 
8 #include "../rx-includes.hpp"
9 
32 namespace rxcpp {
33 
34 namespace sources {
35 
36 namespace detail {
37 
38 template<class Collection>
39 struct is_iterable
40 {
41  typedef rxu::decay_t<Collection> collection_type;
42 
43  struct not_void {};
44  template<class CC>
45  static auto check(int) -> decltype(std::begin(*(CC*)nullptr));
46  template<class CC>
47  static not_void check(...);
48 
49  static const bool value = !std::is_same<decltype(check<collection_type>(0)), not_void>::value;
50 };
51 
52 template<class Collection>
53 struct iterate_traits
54 {
55  typedef rxu::decay_t<Collection> collection_type;
56  typedef rxu::decay_t<decltype(std::begin(*(collection_type*)nullptr))> iterator_type;
57  typedef rxu::value_type_t<std::iterator_traits<iterator_type>> value_type;
58 };
59 
60 template<class Collection, class Coordination>
61 struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>>>
62 {
63  typedef iterate<Collection, Coordination> this_type;
64  typedef iterate_traits<Collection> traits;
65 
66  typedef rxu::decay_t<Coordination> coordination_type;
67  typedef typename coordination_type::coordinator_type coordinator_type;
68 
69  typedef typename traits::collection_type collection_type;
70  typedef typename traits::iterator_type iterator_type;
71 
72  struct iterate_initial_type
73  {
74  iterate_initial_type(collection_type c, coordination_type cn)
75  : collection(std::move(c))
76  , coordination(std::move(cn))
77  {
78  }
79  collection_type collection;
80  coordination_type coordination;
81  };
82  iterate_initial_type initial;
83 
84  iterate(collection_type c, coordination_type cn)
85  : initial(std::move(c), std::move(cn))
86  {
87  }
88  template<class Subscriber>
89  void on_subscribe(Subscriber o) const {
90  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
91 
92  typedef typename coordinator_type::template get<Subscriber>::type output_type;
93 
94  struct iterate_state_type
95  : public iterate_initial_type
96  {
97  iterate_state_type(const iterate_initial_type& i, output_type o)
98  : iterate_initial_type(i)
99  , cursor(std::begin(iterate_initial_type::collection))
100  , end(std::end(iterate_initial_type::collection))
101  , out(std::move(o))
102  {
103  }
104  iterate_state_type(const iterate_state_type& o)
105  : iterate_initial_type(o)
106  , cursor(std::begin(iterate_initial_type::collection))
107  , end(std::end(iterate_initial_type::collection))
108  , out(std::move(o.out)) // since lambda capture does not yet support move
109  {
110  }
111  mutable iterator_type cursor;
112  iterator_type end;
113  mutable output_type out;
114  };
115 
116  // creates a worker whose lifetime is the same as this subscription
117  auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
118 
119  iterate_state_type state(initial, o);
120 
121  auto controller = coordinator.get_worker();
122 
123  auto producer = [state](const rxsc::schedulable& self){
124  if (!state.out.is_subscribed()) {
125  // terminate loop
126  return;
127  }
128 
129  if (state.cursor != state.end) {
130  // send next value
131  state.out.on_next(*state.cursor);
132  ++state.cursor;
133  }
134 
135  if (state.cursor == state.end) {
136  state.out.on_completed();
137  // o is unsubscribed
138  return;
139  }
140 
141  // tail recurse this same action to continue loop
142  self();
143  };
144  auto selectedProducer = on_exception(
145  [&](){return coordinator.act(producer);},
146  o);
147  if (selectedProducer.empty()) {
148  return;
149  }
150  controller.schedule(selectedProducer.get());
151 
152  }
153 };
154 
155 }
156 
159 template<class Collection>
160 auto iterate(Collection c)
161  -> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> {
162  return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>(
163  detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
164 }
167 template<class Collection, class Coordination>
168 auto iterate(Collection c, Coordination cn)
169  -> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> {
170  return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>>(
171  detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
172 }
173 
184 template<class T>
185 auto from()
186  -> decltype(iterate(std::array<T, 0>(), identity_immediate())) {
187  return iterate(std::array<T, 0>(), identity_immediate());
188 }
198 template<class T, class Coordination>
199 auto from(Coordination cn)
200  -> typename std::enable_if<is_coordination<Coordination>::value,
201  decltype( iterate(std::array<T, 0>(), std::move(cn)))>::type {
202  return iterate(std::array<T, 0>(), std::move(cn));
203 }
220 template<class Value0, class... ValueN>
221 auto from(Value0 v0, ValueN... vn)
222  -> typename std::enable_if<!is_coordination<Value0>::value,
223  decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, identity_immediate()))>::type {
224  std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
225  return iterate(std::move(c), identity_immediate());
226 }
245 template<class Coordination, class Value0, class... ValueN>
246 auto from(Coordination cn, Value0 v0, ValueN... vn)
247  -> typename std::enable_if<is_coordination<Coordination>::value,
248  decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, std::move(cn)))>::type {
249  std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
250  return iterate(std::move(c), std::move(cn));
251 }
252 
253 
266 template<class Value0>
267 auto just(Value0 v0)
268  -> typename std::enable_if<!is_coordination<Value0>::value,
269  decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type {
270  std::array<Value0, 1> c{{v0}};
271  return iterate(std::move(c), identity_immediate());
272 }
287 template<class Value0, class Coordination>
288 auto just(Value0 v0, Coordination cn)
289  -> typename std::enable_if<is_coordination<Coordination>::value,
290  decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type {
291  std::array<Value0, 1> c{{v0}};
292  return iterate(std::move(c), std::move(cn));
293 }
294 
315 template<class Observable, class Value0, class... ValueN>
316 auto start_with(Observable o, Value0 v0, ValueN... vn)
319 }
320 
321 }
322 
323 }
324 
325 #endif
Definition: rx-all.hpp:26
auto from() -> decltype(iterate(std::array< T, 0 >(), identity_immediate()))
Definition: rx-iterate.hpp:185
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
static const bool value
Definition: rx-predef.hpp:123
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
auto concat(AN &&...an) -> operator_factory< concat_tag, AN... >
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-concat.hpp:235
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto start_with(Observable o, Value0 v0, ValueN...vn) -> decltype(from(rxu::value_type_t< Observable >(v0), rxu::value_type_t< Observable >(vn)...).concat(o))
Definition: rx-iterate.hpp:316