31 #if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP) 32 #define RXCPP_OPERATORS_RX_CONCATMAP_HPP 34 #include "../rx-includes.hpp" 43 struct concat_map_invalid_arguments {};
46 struct concat_map_invalid :
public rxo::operator_base<concat_map_invalid_arguments<AN...>> {
47 using type = observable<concat_map_invalid_arguments<
AN...>, concat_map_invalid<
AN...>>;
50 using concat_map_invalid_t =
typename concat_map_invalid<
AN...>::type;
52 template<
class Observable,
class CollectionSelector,
class ResultSelector,
class Coordination>
53 struct concat_traits {
54 typedef rxu::decay_t<Observable> source_type;
55 typedef rxu::decay_t<CollectionSelector> collection_selector_type;
56 typedef rxu::decay_t<ResultSelector> result_selector_type;
57 typedef rxu::decay_t<Coordination> coordination_type;
59 typedef typename source_type::value_type source_value_type;
61 struct tag_not_valid {};
62 template<
class CV,
class CCS>
63 static auto collection_check(
int) -> decltype((*(CCS*)
nullptr)(*(CV*)
nullptr));
64 template<
class CV,
class CCS>
65 static tag_not_valid collection_check(...);
67 static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value,
"concat_map CollectionSelector must be a function with the signature observable(concat_map::source_value_type)");
69 typedef decltype((*(collection_selector_type*)
nullptr)((*(source_value_type*)
nullptr))) collection_type;
72 static_assert(is_observable<collection_type>::value,
"concat_map CollectionSelector must return an observable");
75 typedef typename collection_type::value_type collection_value_type;
77 template<
class CV,
class CCV,
class CRS>
78 static auto result_check(
int) -> decltype((*(CRS*)
nullptr)(*(CV*)
nullptr, *(CCV*)
nullptr));
79 template<
class CV,
class CCV,
class CRS>
80 static tag_not_valid result_check(...);
82 static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value,
"concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
84 typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
87 template<
class Observable,
class CollectionSelector,
class ResultSelector,
class Coordination>
89 :
public operator_base<rxu::value_type_t<concat_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
91 typedef concat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
92 typedef concat_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
94 typedef typename traits::source_type source_type;
95 typedef typename traits::collection_selector_type collection_selector_type;
96 typedef typename traits::result_selector_type result_selector_type;
98 typedef typename traits::source_value_type source_value_type;
99 typedef typename traits::collection_type collection_type;
100 typedef typename traits::collection_value_type collection_value_type;
102 typedef typename traits::coordination_type coordination_type;
103 typedef typename coordination_type::coordinator_type coordinator_type;
107 values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
108 : source(std::move(o))
109 , selectCollection(std::move(s))
110 , selectResult(std::move(rs))
111 , coordination(std::move(sf))
115 collection_selector_type selectCollection;
116 result_selector_type selectResult;
117 coordination_type coordination;
119 values& operator=(
const values&) RXCPP_DELETE;
123 concat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
124 : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
128 template<
class Subscriber>
129 void on_subscribe(Subscriber scbr)
const {
132 typedef Subscriber output_type;
134 struct concat_map_state_type
135 :
public std::enable_shared_from_this<concat_map_state_type>
138 concat_map_state_type(values i, coordinator_type coor, output_type oarg)
139 : values(std::move(i))
142 , coordinator(std::move(coor))
143 , out(std::move(oarg))
147 void subscribe_to(source_value_type st)
149 auto state = this->shared_from_this();
152 [&](){
return state->selectCollection(st);},
154 if (selectedCollection.empty()) {
158 collectionLifetime = composite_subscription();
162 auto innercstoken = state->out.add(collectionLifetime);
165 state->out.remove(innercstoken);
169 [&](){
return state->coordinator.in(selectedCollection.get());},
171 if (selectedSource.empty()) {
177 auto sinkInner = make_subscriber<collection_value_type>(
181 [state, st](collection_value_type ct) {
182 auto selectedResult = state->selectResult(st, std::move(ct));
183 state->out.on_next(std::move(selectedResult));
186 [state](std::exception_ptr e) {
187 state->out.on_error(e);
191 if (!state->selectedCollections.empty()) {
192 auto value = state->selectedCollections.front();
193 state->selectedCollections.pop_front();
194 state->collectionLifetime.unsubscribe();
195 state->subscribe_to(value);
196 }
else if (!state->sourceLifetime.is_subscribed()) {
197 state->out.on_completed();
202 [&](){
return state->coordinator.out(sinkInner);},
204 if (selectedSinkInner.empty()) {
207 selectedSource->subscribe(std::move(selectedSinkInner.get()));
209 composite_subscription sourceLifetime;
210 composite_subscription collectionLifetime;
211 std::deque<source_value_type> selectedCollections;
212 coordinator_type coordinator;
216 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
219 auto state = std::make_shared<concat_map_state_type>(initial, std::move(coordinator), std::move(scbr));
221 state->sourceLifetime = composite_subscription();
225 state->out.add(state->sourceLifetime);
228 [&](){
return state->coordinator.in(state->source);},
230 if (source.empty()) {
237 auto sink = make_subscriber<source_value_type>(
239 state->sourceLifetime,
241 [state](source_value_type st) {
242 if (state->collectionLifetime.is_subscribed()) {
243 state->selectedCollections.push_back(st);
244 }
else if (state->selectedCollections.empty()) {
245 state->subscribe_to(st);
249 [state](std::exception_ptr e) {
250 state->out.on_error(e);
254 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
255 state->out.on_completed();
260 [&](){
return state->coordinator.out(sink);},
262 if (selectedSink.empty()) {
265 source->subscribe(std::move(selectedSink.get()));
276 template<
class...
AN>
279 return operator_factory<concat_map_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
284 template<
class...
AN>
287 return operator_factory<concat_map_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
295 template<
class Observable,
class CollectionSelector,
299 class ResultSelectorType = rxu::detail::take_at<1>,
307 static Result
member(Observable&& o, CollectionSelector&& s) {
308 return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(),
identity_current_thread()));
311 template<
class Observable,
class CollectionSelector,
class Coordination,
312 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
315 class ResultSelectorType = rxu::detail::take_at<1>,
317 all_observables<Observable, CollectionType>,
319 class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType,
rxu::decay_t<Coordination>>,
324 static Result
member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
325 return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
328 template<
class Observable,
class CollectionSelector,
class ResultSelector,
330 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
334 all_observables<Observable, CollectionType>,
336 class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>,
rxu::decay_t<ResultSelector>, identity_one_worker>,
338 class ResultSelectorType = rxu::decay_t<ResultSelector>,
342 static Result
member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
343 return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs),
identity_current_thread()));
346 template<
class Observable,
class CollectionSelector,
class ResultSelector,
class Coordination,
347 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
351 all_observables<Observable, CollectionType>,
352 is_coordination<Coordination>>,
353 class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
355 class ResultSelectorType = rxu::decay_t<ResultSelector>,
359 static Result
member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
360 return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
363 template<
class...
AN>
364 static operators::detail::concat_map_invalid_t<
AN...>
member(
AN...) {
367 static_assert(
sizeof...(
AN) == 10000,
"concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
static Result member(Observable &&o, CollectionSelector &&s, Coordination &&cn)
Definition: rx-concat_map.hpp:324
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
static Result member(Observable &&o, CollectionSelector &&s)
Definition: rx-concat_map.hpp:307
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static composite_subscription empty()
Definition: rx-subscription.hpp:404
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, CollectionSelector &&s, ResultSelector &&rs)
Definition: rx-concat_map.hpp:342
auto concat_transform(AN &&...an) -> operator_factory< concat_map_tag, AN... >
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-concat_map.hpp:285
static Result member(Observable &&o, CollectionSelector &&s, ResultSelector &&rs, Coordination &&cn)
Definition: rx-concat_map.hpp:359
auto concat_map(AN &&...an) -> operator_factory< concat_map_tag, AN... >
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-concat_map.hpp:277
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-operators.hpp:164
Definition: rx-coordination.hpp:114
typename std::result_of< TN... >::type result_of_t
Definition: rx-util.hpp:37
Definition: rx-util.hpp:802
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:37
static operators::detail::concat_map_invalid_t< AN... > member(AN...)
Definition: rx-concat_map.hpp:364