41 #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP) 42 #define RXCPP_OPERATORS_RX_CONCAT_HPP 44 #include "../rx-includes.hpp" 53 struct concat_invalid_arguments {};
56 struct concat_invalid :
public rxo::operator_base<concat_invalid_arguments<AN...>> {
57 using type = observable<concat_invalid_arguments<
AN...>, concat_invalid<
AN...>>;
60 using concat_invalid_t =
typename concat_invalid<
AN...>::type;
62 template<
class T,
class Observable,
class Coordination>
64 :
public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
66 typedef concat<T, Observable, Coordination> this_type;
68 typedef rxu::decay_t<T> source_value_type;
69 typedef rxu::decay_t<Observable> source_type;
70 typedef rxu::decay_t<Coordination> coordination_type;
72 typedef typename coordination_type::coordinator_type coordinator_type;
74 typedef typename source_type::source_operator_type source_operator_type;
75 typedef source_value_type collection_type;
76 typedef typename collection_type::value_type value_type;
80 values(source_operator_type o, coordination_type sf)
81 : source_operator(std::move(o))
82 , coordination(std::move(sf))
85 source_operator_type source_operator;
86 coordination_type coordination;
90 concat(
const source_type& o, coordination_type sf)
91 : initial(o.source_operator, std::move(sf))
95 template<
class Subscriber>
96 void on_subscribe(Subscriber scbr)
const {
99 typedef Subscriber output_type;
101 struct concat_state_type
102 :
public std::enable_shared_from_this<concat_state_type>
105 concat_state_type(values i, coordinator_type coor, output_type oarg)
107 , source(i.source_operator)
110 , coordinator(std::move(coor))
111 , out(std::move(oarg))
115 void subscribe_to(collection_type st)
117 auto state = this->shared_from_this();
119 collectionLifetime = composite_subscription();
123 auto innercstoken = state->out.add(collectionLifetime);
126 state->out.remove(innercstoken);
130 [&](){
return state->coordinator.in(std::move(st));},
132 if (selectedSource.empty()) {
138 auto sinkInner = make_subscriber<value_type>(
142 [state, st](value_type ct) {
143 state->out.on_next(ct);
146 [state](std::exception_ptr e) {
147 state->out.on_error(e);
151 if (!state->selectedCollections.empty()) {
152 auto value = state->selectedCollections.front();
153 state->selectedCollections.pop_front();
154 state->collectionLifetime.unsubscribe();
155 state->subscribe_to(value);
156 }
else if (!state->sourceLifetime.is_subscribed()) {
157 state->out.on_completed();
162 [&](){
return state->coordinator.out(sinkInner);},
164 if (selectedSinkInner.empty()) {
167 selectedSource->subscribe(std::move(selectedSinkInner.get()));
169 observable<source_value_type, source_operator_type> source;
170 composite_subscription sourceLifetime;
171 composite_subscription collectionLifetime;
172 std::deque<collection_type> selectedCollections;
173 coordinator_type coordinator;
177 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
180 auto state = std::make_shared<concat_state_type>(initial, std::move(coordinator), std::move(scbr));
182 state->sourceLifetime = composite_subscription();
186 state->out.add(state->sourceLifetime);
189 [&](){
return state->coordinator.in(state->source);},
191 if (source.empty()) {
198 auto sink = make_subscriber<collection_type>(
200 state->sourceLifetime,
202 [state](collection_type st) {
203 if (state->collectionLifetime.is_subscribed()) {
204 state->selectedCollections.push_back(st);
205 }
else if (state->selectedCollections.empty()) {
206 state->subscribe_to(st);
210 [state](std::exception_ptr e) {
211 state->out.on_error(e);
215 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
216 state->out.on_completed();
221 [&](){
return state->coordinator.out(sink);},
223 if (selectedSink.empty()) {
226 source->subscribe(std::move(selectedSink.get()));
234 template<
class...
AN>
245 template<
class Observable,
249 class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>,
identity_one_worker>,
257 template<
class Observable,
class Coordination,
259 is_observable<Observable>,
266 static Result
member(Observable&& o, Coordination&& cn) {
267 return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn)));
270 template<
class Observable,
class Value0,
class... ValueN,
280 static Result
member(Observable&& o, Value0&& v0, ValueN&&... vn) {
284 template<
class Observable,
class Coordination,
class Value0,
class... ValueN,
287 is_coordination<Coordination>>,
295 static Result
member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
296 return Result(Concat(
rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
299 template<
class...
AN>
300 static operators::detail::concat_invalid_t<
AN...>
member(
AN...) {
303 static_assert(
sizeof...(
AN) == 10000,
"concat takes (optional Coordination, optional Value0, optional ValueN...)");
static operators::detail::concat_invalid_t< AN... > member(AN...)
Definition: rx-concat.hpp:300
Definition: rx-util.hpp:100
static Result member(Observable &&o, Coordination &&cn, Value0 &&v0, ValueN &&...vn)
Definition: rx-concat.hpp:295
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:157
static Result member(Observable &&o)
Definition: rx-concat.hpp:253
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static composite_subscription empty()
Definition: rx-subscription.hpp:404
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-util.hpp:325
static Result member(Observable &&o, Value0 &&v0, ValueN &&...vn)
Definition: rx-concat.hpp:280
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-concat.hpp:266
auto concat(AN &&...an) -> operator_factory< concat_tag, AN... >
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-concat.hpp:235
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37