31 #if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP) 32 #define RXCPP_OPERATORS_RX_FLATMAP_HPP 34 #include "../rx-includes.hpp" 43 struct flat_map_invalid_arguments {};
46 struct flat_map_invalid :
public rxo::operator_base<flat_map_invalid_arguments<AN...>> {
47 using type = observable<flat_map_invalid_arguments<
AN...>, flat_map_invalid<
AN...>>;
50 using flat_map_invalid_t =
typename flat_map_invalid<
AN...>::type;
52 template<
class Observable,
class CollectionSelector,
class ResultSelector,
class Coordination>
53 struct flat_map_traits {
54 typedef rxu::decay_t<Observable> source_type;
55 typedef rxu::decay_t<CollectionSelector> collection_selector_type;
56 typedef rxu::decay_t<ResultSelector> result_selector_type;
57 typedef rxu::decay_t<Coordination> coordination_type;
59 typedef typename source_type::value_type source_value_type;
61 struct tag_not_valid {};
62 template<
class CV,
class CCS>
63 static auto collection_check(
int) -> decltype((*(CCS*)
nullptr)(*(CV*)
nullptr));
64 template<
class CV,
class CCS>
65 static tag_not_valid collection_check(...);
67 static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value,
"flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
69 typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
71 static_assert(is_observable<collection_type>::value,
"flat_map CollectionSelector must return an observable");
73 typedef typename collection_type::value_type collection_value_type;
75 template<
class CV,
class CCV,
class CRS>
76 static auto result_check(
int) -> decltype((*(CRS*)
nullptr)(*(CV*)
nullptr, *(CCV*)
nullptr));
77 template<
class CV,
class CCV,
class CRS>
78 static tag_not_valid result_check(...);
80 static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value,
"flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
82 typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
85 template<
class Observable,
class CollectionSelector,
class ResultSelector,
class Coordination>
87 :
public operator_base<rxu::value_type_t<flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
89 typedef flat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
90 typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
92 typedef typename traits::source_type source_type;
93 typedef typename traits::collection_selector_type collection_selector_type;
94 typedef typename traits::result_selector_type result_selector_type;
96 typedef typename traits::source_value_type source_value_type;
97 typedef typename traits::collection_type collection_type;
98 typedef typename traits::collection_value_type collection_value_type;
100 typedef typename traits::coordination_type coordination_type;
101 typedef typename coordination_type::coordinator_type coordinator_type;
105 values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
106 : source(std::move(o))
107 , selectCollection(std::move(s))
108 , selectResult(std::move(rs))
109 , coordination(std::move(sf))
113 collection_selector_type selectCollection;
114 result_selector_type selectResult;
115 coordination_type coordination;
119 flat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
120 : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
124 template<
class Subscriber>
125 void on_subscribe(Subscriber scbr)
const {
128 typedef Subscriber output_type;
131 :
public std::enable_shared_from_this<state_type>
134 state_type(values i, coordinator_type coor, output_type oarg)
135 : values(std::move(i))
136 , pendingCompletions(0)
137 , coordinator(std::move(coor))
138 , out(std::move(oarg))
143 int pendingCompletions;
144 coordinator_type coordinator;
148 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
151 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(scbr));
153 composite_subscription outercs;
157 state->out.add(outercs);
160 [&](){
return state->coordinator.in(state->source);},
162 if (source.empty()) {
166 ++state->pendingCompletions;
170 auto sink = make_subscriber<source_value_type>(
174 [state](source_value_type st) {
176 composite_subscription innercs;
180 auto innercstoken = state->out.add(innercs);
183 state->out.remove(innercstoken);
186 auto selectedCollection = state->selectCollection(st);
187 auto selectedSource = state->coordinator.in(selectedCollection);
189 ++state->pendingCompletions;
192 auto sinkInner = make_subscriber<collection_value_type>(
196 [state, st](collection_value_type ct) {
197 auto selectedResult = state->selectResult(st, std::move(ct));
198 state->out.on_next(std::move(selectedResult));
201 [state](std::exception_ptr e) {
202 state->out.on_error(e);
206 if (--state->pendingCompletions == 0) {
207 state->out.on_completed();
212 auto selectedSinkInner = state->coordinator.out(sinkInner);
213 selectedSource.subscribe(std::move(selectedSinkInner));
216 [state](std::exception_ptr e) {
217 state->out.on_error(e);
221 if (--state->pendingCompletions == 0) {
222 state->out.on_completed();
228 [&](){
return state->coordinator.out(sink);},
230 if (selectedSink.empty()) {
234 source->subscribe(std::move(selectedSink.get()));
243 template<
class...
AN>
246 return operator_factory<flat_map_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
251 template<
class...
AN>
254 return operator_factory<flat_map_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
262 template<
class Observable,
class CollectionSelector,
266 class ResultSelectorType = rxu::detail::take_at<1>,
274 static Result
member(Observable&& o, CollectionSelector&& s) {
275 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(),
identity_current_thread()));
278 template<
class Observable,
class CollectionSelector,
class Coordination,
279 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
282 class ResultSelectorType = rxu::detail::take_at<1>,
284 all_observables<Observable, CollectionType>,
286 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType,
rxu::decay_t<Coordination>>,
291 static Result
member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
292 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
295 template<
class Observable,
class CollectionSelector,
class ResultSelector,
297 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
301 all_observables<Observable, CollectionType>,
303 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>,
rxu::decay_t<ResultSelector>, identity_one_worker>,
305 class ResultSelectorType = rxu::decay_t<ResultSelector>,
309 static Result
member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
310 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs),
identity_current_thread()));
313 template<
class Observable,
class CollectionSelector,
class ResultSelector,
class Coordination,
314 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
318 all_observables<Observable, CollectionType>,
319 is_coordination<Coordination>>,
320 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
322 class ResultSelectorType = rxu::decay_t<ResultSelector>,
326 static Result
member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
327 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
330 template<
class...
AN>
331 static operators::detail::flat_map_invalid_t<
AN...>
member(
AN...) {
334 static_assert(
sizeof...(
AN) == 10000,
"flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
static Result member(Observable &&o, CollectionSelector &&s, ResultSelector &&rs)
Definition: rx-flat_map.hpp:309
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
auto flat_map(AN &&...an) -> operator_factory< flat_map_tag, AN... >
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-flat_map.hpp:244
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static Result member(Observable &&o, CollectionSelector &&s, ResultSelector &&rs, Coordination &&cn)
Definition: rx-flat_map.hpp:326
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
Definition: rx-operators.hpp:227
auto merge_transform(AN &&...an) -> operator_factory< flat_map_tag, AN... >
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-flat_map.hpp:252
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, CollectionSelector &&s, Coordination &&cn)
Definition: rx-flat_map.hpp:291
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static operators::detail::flat_map_invalid_t< AN... > member(AN...)
Definition: rx-flat_map.hpp:331
Definition: rx-coordination.hpp:114
typename std::result_of< TN... >::type result_of_t
Definition: rx-util.hpp:37
Definition: rx-util.hpp:802
static Result member(Observable &&o, CollectionSelector &&s)
Definition: rx-flat_map.hpp:274
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37