5 #if !defined(RXCPP_SOURCES_RX_ITERATE_HPP) 6 #define RXCPP_SOURCES_RX_ITERATE_HPP 8 #include "../rx-includes.hpp" 38 template<
class Collection>
41 typedef rxu::decay_t<Collection> collection_type;
45 static auto check(
int) -> decltype(std::begin(*(CC*)
nullptr));
47 static not_void check(...);
49 static const bool value = !std::is_same<decltype(check<collection_type>(0)), not_void>::value;
52 template<
class Collection>
55 typedef rxu::decay_t<Collection> collection_type;
56 typedef rxu::decay_t<decltype(std::begin(*(collection_type*)nullptr))> iterator_type;
57 typedef rxu::value_type_t<std::iterator_traits<iterator_type>> value_type;
60 template<
class Collection,
class Coordination>
61 struct iterate :
public source_base<rxu::value_type_t<iterate_traits<Collection>>>
63 typedef iterate<Collection, Coordination> this_type;
64 typedef iterate_traits<Collection> traits;
66 typedef rxu::decay_t<Coordination> coordination_type;
67 typedef typename coordination_type::coordinator_type coordinator_type;
69 typedef typename traits::collection_type collection_type;
70 typedef typename traits::iterator_type iterator_type;
72 struct iterate_initial_type
74 iterate_initial_type(collection_type c, coordination_type cn)
75 : collection(std::move(c))
76 , coordination(std::move(cn))
79 collection_type collection;
80 coordination_type coordination;
82 iterate_initial_type initial;
84 iterate(collection_type c, coordination_type cn)
85 : initial(std::move(c), std::move(cn))
88 template<
class Subscriber>
89 void on_subscribe(Subscriber o)
const {
92 typedef typename coordinator_type::template get<Subscriber>::type output_type;
94 struct iterate_state_type
95 :
public iterate_initial_type
97 iterate_state_type(
const iterate_initial_type& i, output_type o)
98 : iterate_initial_type(i)
99 , cursor(std::begin(iterate_initial_type::collection))
100 , end(std::end(iterate_initial_type::collection))
104 iterate_state_type(
const iterate_state_type& o)
105 : iterate_initial_type(o)
106 , cursor(std::begin(iterate_initial_type::collection))
107 , end(std::end(iterate_initial_type::collection))
108 , out(std::move(o.out))
111 mutable iterator_type cursor;
113 mutable output_type out;
117 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
119 iterate_state_type state(initial, o);
121 auto controller = coordinator.get_worker();
123 auto producer = [state](
const rxsc::schedulable&
self){
124 if (!state.out.is_subscribed()) {
129 if (state.cursor != state.end) {
131 state.out.on_next(*state.cursor);
135 if (state.cursor == state.end) {
136 state.out.on_completed();
145 [&](){
return coordinator.act(producer);},
147 if (selectedProducer.empty()) {
150 controller.schedule(selectedProducer.get());
159 template<
class Collection>
163 detail::iterate<Collection, identity_one_worker>(std::move(c),
identity_immediate()));
167 template<
class Collection,
class Coordination>
171 detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
198 template<
class T,
class Coordination>
200 ->
typename std::enable_if<is_coordination<Coordination>::value,
201 decltype(
iterate(std::array<T, 0>(), std::move(cn)))>::type {
202 return iterate(std::array<T, 0>(), std::move(cn));
220 template<
class Value0,
class... ValueN>
221 auto from(Value0 v0, ValueN... vn)
222 ->
typename std::enable_if<!is_coordination<Value0>::value,
224 std::array<Value0,
sizeof...(ValueN) + 1> c{{v0, vn...}};
245 template<
class Coordination,
class Value0,
class... ValueN>
246 auto from(Coordination cn, Value0 v0, ValueN... vn)
247 ->
typename std::enable_if<is_coordination<Coordination>::value,
248 decltype(
iterate(*(std::array<Value0,
sizeof...(ValueN) + 1>*)
nullptr, std::move(cn)))>::type {
249 std::array<Value0,
sizeof...(ValueN) + 1> c{{v0, vn...}};
250 return iterate(std::move(c), std::move(cn));
266 template<
class Value0>
268 ->
typename std::enable_if<!is_coordination<Value0>::value,
270 std::array<Value0, 1> c{{v0}};
287 template<
class Value0,
class Coordination>
288 auto just(Value0 v0, Coordination cn)
289 ->
typename std::enable_if<is_coordination<Coordination>::value,
290 decltype(
iterate(*(std::array<Value0, 1>*)
nullptr, std::move(cn)))>::type {
291 std::array<Value0, 1> c{{v0}};
292 return iterate(std::move(c), std::move(cn));
315 template<
class Observable,
class Value0,
class... ValueN>
Definition: rx-all.hpp:26
auto from() -> decltype(iterate(std::array< T, 0 >(), identity_immediate()))
Definition: rx-iterate.hpp:185
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
static const bool value
Definition: rx-predef.hpp:123
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
auto concat(AN &&...an) -> operator_factory< concat_tag, AN... >
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-concat.hpp:235
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto start_with(Observable o, Value0 v0, ValueN...vn) -> decltype(from(rxu::value_type_t< Observable >(v0), rxu::value_type_t< Observable >(vn)...).concat(o))
Definition: rx-iterate.hpp:316