45 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP) 46 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP 48 #include "../rx-includes.hpp" 57 struct buffer_with_time_invalid_arguments {};
60 struct buffer_with_time_invalid :
public rxo::operator_base<buffer_with_time_invalid_arguments<AN...>> {
61 using type = observable<buffer_with_time_invalid_arguments<
AN...>, buffer_with_time_invalid<
AN...>>;
64 using buffer_with_time_invalid_t =
typename buffer_with_time_invalid<
AN...>::type;
66 template<
class T,
class Duration,
class Coordination>
69 typedef rxu::decay_t<T> source_value_type;
70 typedef std::vector<source_value_type> value_type;
71 typedef rxu::decay_t<Coordination> coordination_type;
72 typedef typename coordination_type::coordinator_type coordinator_type;
73 typedef rxu::decay_t<Duration> duration_type;
75 struct buffer_with_time_values
77 buffer_with_time_values(duration_type p, duration_type s, coordination_type c)
85 coordination_type coordination;
87 buffer_with_time_values initial;
90 : initial(period, skip, coordination)
94 template<
class Subscriber>
95 struct buffer_with_time_observer
97 typedef buffer_with_time_observer<Subscriber> this_type;
98 typedef std::vector<T> value_type;
99 typedef rxu::decay_t<Subscriber> dest_type;
100 typedef observer<value_type, this_type> observer_type;
102 struct buffer_with_time_subscriber_values :
public buffer_with_time_values
104 buffer_with_time_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
105 : buffer_with_time_values(v)
108 , coordinator(std::move(c))
109 , worker(coordinator.get_worker())
110 , expected(worker.now())
113 composite_subscription cs;
115 coordinator_type coordinator;
117 mutable std::deque<value_type> chunks;
118 rxsc::scheduler::clock_type::time_point expected;
120 std::shared_ptr<buffer_with_time_subscriber_values> state;
122 buffer_with_time_observer(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
123 : state(std::make_shared<buffer_with_time_subscriber_values>(buffer_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
125 auto localState = state;
127 auto disposer = [=](
const rxsc::schedulable&){
128 localState->cs.unsubscribe();
129 localState->dest.unsubscribe();
130 localState->worker.unsubscribe();
133 [&](){
return localState->coordinator.act(disposer);},
135 if (selectedDisposer.empty()) {
139 localState->dest.add([=](){
140 localState->worker.schedule(selectedDisposer.get());
142 localState->cs.add([=](){
143 localState->worker.schedule(selectedDisposer.get());
152 auto produce_buffer = [localState](
const rxsc::schedulable&) {
153 localState->dest.on_next(std::move(localState->chunks.front()));
154 localState->chunks.pop_front();
157 [&](){
return localState->coordinator.act(produce_buffer);},
159 if (selectedProduce.empty()) {
163 auto create_buffer = [localState, selectedProduce](
const rxsc::schedulable&) {
164 localState->chunks.emplace_back();
165 auto produce_at = localState->expected + localState->period;
166 localState->expected += localState->skip;
167 localState->worker.schedule(produce_at, [localState, selectedProduce](
const rxsc::schedulable&) {
168 localState->worker.schedule(selectedProduce.get());
172 [&](){
return localState->coordinator.act(create_buffer);},
174 if (selectedCreate.empty()) {
178 state->worker.schedule_periodically(
181 [localState, selectedCreate](
const rxsc::schedulable&) {
182 localState->worker.schedule(selectedCreate.get());
185 void on_next(T v)
const {
186 auto localState = state;
187 auto work = [v, localState](
const rxsc::schedulable&){
188 for(
auto& chunk : localState->chunks) {
193 [&](){
return localState->coordinator.act(work);},
195 if (selectedWork.empty()) {
198 localState->worker.schedule(selectedWork.get());
200 void on_error(std::exception_ptr e)
const {
201 auto localState = state;
202 auto work = [e, localState](
const rxsc::schedulable&){
203 localState->dest.on_error(e);
206 [&](){
return localState->coordinator.act(work);},
208 if (selectedWork.empty()) {
211 localState->worker.schedule(selectedWork.get());
213 void on_completed()
const {
214 auto localState = state;
215 auto work = [localState](
const rxsc::schedulable&){
218 while (!localState->chunks.empty()) {
219 localState->dest.on_next(std::move(localState->chunks.front()));
220 localState->chunks.pop_front();
225 localState->dest.on_completed();
228 [&](){
return localState->coordinator.act(work);},
230 if (selectedWork.empty()) {
233 localState->worker.schedule(selectedWork.get());
236 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_values v) {
237 auto cs = composite_subscription();
238 auto coordinator = v.coordination.create_coordinator();
240 return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
244 template<
class Subscriber>
245 auto operator()(Subscriber dest)
const 246 -> decltype(buffer_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
247 return buffer_with_time_observer<Subscriber>::make(std::move(dest), initial);
255 template<
class...
AN>
258 return operator_factory<buffer_with_time_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
266 template<
class Observable,
class Duration,
269 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
271 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>,
273 static auto member(Observable&& o, Duration period)
278 template<
class Observable,
class Duration,
class Coordination,
280 is_observable<Observable>,
281 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
286 static auto member(Observable&& o, Duration period, Coordination&& cn)
287 -> decltype(o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)))) {
288 return o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)));
291 template<
class Observable,
class Duration,
293 is_observable<Observable>,
294 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
296 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
298 static auto member(Observable&& o, Duration&& period, Duration&&
skip)
299 -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip),
identity_current_thread()))) {
300 return o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip),
identity_current_thread()));
303 template<
class Observable,
class Duration,
class Coordination,
305 is_observable<Observable>,
306 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
307 is_coordination<Coordination>>,
309 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
311 static auto member(Observable&& o, Duration&& period, Duration&&
skip, Coordination&& cn)
312 -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip), std::forward<Coordination>(cn)))) {
313 return o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip), std::forward<Coordination>(cn)));
316 template<
class...
AN>
317 static operators::detail::buffer_with_time_invalid_t<
AN...>
member(
AN...) {
320 static_assert(
sizeof...(
AN) == 10000,
"buffer_with_time takes (Duration, optional Duration, optional Coordination)");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static auto member(Observable &&o, Duration period, Coordination &&cn) -> decltype(o.template lift< Value >(BufferWithTime(period, period, std::forward< Coordination >(cn))))
Definition: rx-buffer_time.hpp:286
Definition: rx-operators.hpp:69
static auto member(Observable &&o, Duration &&period, Duration &&skip, Coordination &&cn) -> decltype(o.template lift< Value >(BufferWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), std::forward< Coordination >(cn))))
Definition: rx-buffer_time.hpp:311
auto buffer_with_time(AN &&...an) -> operator_factory< buffer_with_time_tag, AN... >
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-buffer_time.hpp:256
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration &&period, Duration &&skip) -> decltype(o.template lift< Value >(BufferWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), identity_current_thread())))
Definition: rx-buffer_time.hpp:298
Definition: rx-operators.hpp:136
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static auto member(Observable &&o, Duration period) -> decltype(o.template lift< Value >(BufferWithTime(period, period, identity_current_thread())))
Definition: rx-buffer_time.hpp:273
static operators::detail::buffer_with_time_invalid_t< AN... > member(AN...)
Definition: rx-buffer_time.hpp:317
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37