27 #if !defined(RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP) 28 #define RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP 30 #include "../rx-includes.hpp" 39 struct skip_until_invalid_arguments {};
42 struct skip_until_invalid :
public rxo::operator_base<skip_until_invalid_arguments<AN...>> {
43 using type = observable<skip_until_invalid_arguments<
AN...>, skip_until_invalid<
AN...>>;
46 using skip_until_invalid_t =
typename skip_until_invalid<
AN...>::type;
48 template<
class T,
class Observable,
class TriggerObservable,
class Coordination>
51 typedef rxu::decay_t<Observable> source_type;
52 typedef rxu::decay_t<TriggerObservable> trigger_source_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
57 values(source_type s, trigger_source_type t, coordination_type sf)
58 : source(std::move(s))
59 , trigger(std::move(t))
60 , coordination(std::move(sf))
64 trigger_source_type trigger;
65 coordination_type coordination;
69 skip_until(source_type s, trigger_source_type t, coordination_type sf)
70 : initial(std::move(s), std::move(t), std::move(sf))
85 template<
class Subscriber>
86 void on_subscribe(Subscriber s)
const {
88 typedef Subscriber output_type;
90 :
public std::enable_shared_from_this<state_type>
93 state_type(
const values& i, coordinator_type coor,
const output_type& oarg)
95 , mode_value(mode::skipping)
96 , coordinator(std::move(coor))
99 out.add(trigger_lifetime);
100 out.add(source_lifetime);
102 typename mode::type mode_value;
103 composite_subscription trigger_lifetime;
104 composite_subscription source_lifetime;
105 coordinator_type coordinator;
109 auto coordinator = initial.coordination.create_coordinator();
112 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
115 [&](){
return state->coordinator.in(state->trigger);},
117 if (trigger.empty()) {
122 [&](){
return state->coordinator.in(state->source);},
124 if (source.empty()) {
128 auto sinkTrigger = make_subscriber<typename trigger_source_type::value_type>(
132 state->trigger_lifetime,
134 [state](
const typename trigger_source_type::value_type&) {
135 if (state->mode_value != mode::skipping) {
138 state->mode_value = mode::triggered;
139 state->trigger_lifetime.unsubscribe();
142 [state](std::exception_ptr e) {
143 if (state->mode_value != mode::skipping) {
146 state->mode_value = mode::errored;
147 state->out.on_error(e);
151 if (state->mode_value != mode::skipping) {
154 state->mode_value = mode::clear;
155 state->trigger_lifetime.unsubscribe();
159 [&](){
return state->coordinator.out(sinkTrigger);},
161 if (selectedSinkTrigger.empty()) {
164 trigger->subscribe(std::move(selectedSinkTrigger.get()));
166 source.get().subscribe(
168 state->source_lifetime,
171 if (state->mode_value != mode::triggered) {
174 state->out.on_next(t);
177 [state](std::exception_ptr e) {
178 if (state->mode_value > mode::triggered) {
181 state->mode_value = mode::errored;
182 state->out.on_error(e);
186 if (state->mode_value != mode::triggered) {
189 state->mode_value = mode::stopped;
190 state->out.on_completed();
200 template<
class...
AN>
203 return operator_factory<skip_until_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
211 template<
class Observable,
class TimePoint,
214 std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
219 class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable,
identity_one_worker>,
222 static Result
member(Observable&& o, TimePoint&& when) {
224 return Result(SkipUntil(std::forward<Observable>(o),
rxs::timer(std::forward<TimePoint>(when), cn), cn));
227 template<
class Observable,
class TimePoint,
class Coordination,
229 is_observable<Observable>,
231 std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
236 class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable,
rxu::decay_t<Coordination>>,
239 static Result
member(Observable&& o, TimePoint&& when, Coordination cn) {
240 return Result(SkipUntil(std::forward<Observable>(o),
rxs::timer(std::forward<TimePoint>(when), cn), cn));
243 template<
class Observable,
class TriggerObservable,
250 static Result
member(Observable&& o, TriggerObservable&& t) {
251 return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t),
identity_current_thread()));
254 template<
class Observable,
class TriggerObservable,
class Coordination,
256 all_observables<Observable, TriggerObservable>,
257 is_coordination<Coordination>>,
259 class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>,
262 static Result
member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
263 return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
266 template<
class...
AN>
267 static operators::detail::skip_until_invalid_t<
AN...>
member(
AN...) {
270 static_assert(
sizeof...(
AN) == 10000,
"skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto skip_until(AN &&...an) -> operator_factory< skip_until_tag, AN... >
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-skip_until.hpp:201
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:381
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, TimePoint &&when)
Definition: rx-skip_until.hpp:222
Definition: rx-util.hpp:325
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
static Result member(Observable &&o, TimePoint &&when, Coordination cn)
Definition: rx-skip_until.hpp:239
static Result member(Observable &&o, TriggerObservable &&t, Coordination &&cn)
Definition: rx-skip_until.hpp:262
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static Result member(Observable &&o, TriggerObservable &&t)
Definition: rx-skip_until.hpp:250
static operators::detail::skip_until_invalid_t< AN... > member(AN...)
Definition: rx-skip_until.hpp:267
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37