9 #include "../rx-includes.hpp" 15 namespace retry_repeat_common {
17 template <
class Values,
class Subscriber,
class EventHandlers,
class T>
18 struct state_type :
public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
21 typedef Subscriber output_type;
22 state_type(
const Values& i,
const output_type& oarg)
24 source_lifetime(composite_subscription::
empty()),
29 auto state = this->shared_from_this();
31 state->out.remove(state->lifetime_token);
32 state->source_lifetime.unsubscribe();
34 state->source_lifetime = composite_subscription();
35 state->lifetime_token = state->out.add(state->source_lifetime);
37 state->source.subscribe(
39 state->source_lifetime,
42 state->out.on_next(t);
45 [state](std::exception_ptr e) {
46 EventHandlers::on_error(state, e);
50 EventHandlers::on_completed(state);
55 composite_subscription source_lifetime;
61 template <
class EventHandlers,
class T,
class Observable,
class Count>
62 struct finite :
public operator_base<T> {
63 typedef rxu::decay_t<Observable> source_type;
64 typedef rxu::decay_t<Count> count_type;
67 values(source_type s, count_type t)
68 : source(std::move(s)),
69 remaining_(std::move(t)) {
72 inline bool completed_predicate()
const {
74 return remaining_ <= 0;
77 inline void update() {
86 count_type remaining_;
89 finite(source_type s, count_type t)
90 : initial_(std::move(s), std::move(t)) {
93 template<
class Subscriber>
94 void on_subscribe(
const Subscriber& s)
const {
95 typedef state_type<values, Subscriber, EventHandlers, T> state_t;
97 auto state = std::make_shared<state_t>(initial_, s);
98 if (initial_.completed_predicate()) {
100 state->out.on_completed();
103 state->do_subscribe();
112 template <
class EventHandlers,
class T,
class Observable>
113 struct infinite :
public operator_base<T> {
114 typedef rxu::decay_t<Observable> source_type;
117 values(source_type s)
118 : source(std::move(s)) {
121 static inline bool completed_predicate() {
126 static inline void update() {
133 infinite(source_type s) : initial_(std::move(s)) {
136 template<
class Subscriber>
137 void on_subscribe(
const Subscriber& s)
const {
138 typedef state_type<values, Subscriber, EventHandlers, T> state_t;
140 auto state = std::make_shared<state_t>(initial_, s);
142 state->do_subscribe();
Definition: rx-all.hpp:26
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37