RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-retry-repeat-common.hpp
Go to the documentation of this file.
1 #pragma once
2 
9 #include "../rx-includes.hpp"
10 
11 namespace rxcpp {
12  namespace operators {
13  namespace detail {
14 
15  namespace retry_repeat_common {
16  // Structure to perform general retry/repeat operations on state
17  template <class Values, class Subscriber, class EventHandlers, class T>
18  struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
19  public Values {
20 
21  typedef Subscriber output_type;
22  state_type(const Values& i, const output_type& oarg)
23  : Values(i),
24  source_lifetime(composite_subscription::empty()),
25  out(oarg) {
26  }
27 
28  void do_subscribe() {
29  auto state = this->shared_from_this();
30 
31  state->out.remove(state->lifetime_token);
32  state->source_lifetime.unsubscribe();
33 
34  state->source_lifetime = composite_subscription();
35  state->lifetime_token = state->out.add(state->source_lifetime);
36 
37  state->source.subscribe(
38  state->out,
39  state->source_lifetime,
40  // on_next
41  [state](T t) {
42  state->out.on_next(t);
43  },
44  // on_error
45  [state](std::exception_ptr e) {
46  EventHandlers::on_error(state, e);
47  },
48  // on_completed
49  [state]() {
50  EventHandlers::on_completed(state);
51  }
52  );
53  }
54 
55  composite_subscription source_lifetime;
56  output_type out;
58  };
59 
60  // Finite case (explicitely limited with the number of times)
61  template <class EventHandlers, class T, class Observable, class Count>
62  struct finite : public operator_base<T> {
63  typedef rxu::decay_t<Observable> source_type;
64  typedef rxu::decay_t<Count> count_type;
65 
66  struct values {
67  values(source_type s, count_type t)
68  : source(std::move(s)),
69  remaining_(std::move(t)) {
70  }
71 
72  inline bool completed_predicate() const {
73  // Return true if we are completed
74  return remaining_ <= 0;
75  }
76 
77  inline void update() {
78  // Decrement counter
79  --remaining_;
80  }
81 
82  source_type source;
83 
84  private:
85  // Counter to hold number of times remaining to complete
86  count_type remaining_;
87  };
88 
89  finite(source_type s, count_type t)
90  : initial_(std::move(s), std::move(t)) {
91  }
92 
93  template<class Subscriber>
94  void on_subscribe(const Subscriber& s) const {
95  typedef state_type<values, Subscriber, EventHandlers, T> state_t;
96  // take a copy of the values for each subscription
97  auto state = std::make_shared<state_t>(initial_, s);
98  if (initial_.completed_predicate()) {
99  // return completed
100  state->out.on_completed();
101  } else {
102  // start the first iteration
103  state->do_subscribe();
104  }
105  }
106 
107  private:
108  values initial_;
109  };
110 
111  // Infinite case
112  template <class EventHandlers, class T, class Observable>
113  struct infinite : public operator_base<T> {
114  typedef rxu::decay_t<Observable> source_type;
115 
116  struct values {
117  values(source_type s)
118  : source(std::move(s)) {
119  }
120 
121  static inline bool completed_predicate() {
122  // Infinite never completes
123  return false;
124  }
125 
126  static inline void update() {
127  // Infinite does not need to update state
128  }
129 
130  source_type source;
131  };
132 
133  infinite(source_type s) : initial_(std::move(s)) {
134  }
135 
136  template<class Subscriber>
137  void on_subscribe(const Subscriber& s) const {
138  typedef state_type<values, Subscriber, EventHandlers, T> state_t;
139  // take a copy of the values for each subscription
140  auto state = std::make_shared<state_t>(initial_, s);
141  // start the first iteration
142  state->do_subscribe();
143  }
144 
145  private:
146  values initial_;
147  };
148 
149 
150  }
151  }
152  }
153 }
Definition: rx-all.hpp:26
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37