RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-buffer_time_count.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
27 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP)
28 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP
29 
30 #include "../rx-includes.hpp"
31 
32 namespace rxcpp {
33 
34 namespace operators {
35 
36 namespace detail {
37 
38 template<class... AN>
39 struct buffer_with_time_or_count_invalid_arguments {};
40 
41 template<class... AN>
42 struct buffer_with_time_or_count_invalid : public rxo::operator_base<buffer_with_time_or_count_invalid_arguments<AN...>> {
43  using type = observable<buffer_with_time_or_count_invalid_arguments<AN...>, buffer_with_time_or_count_invalid<AN...>>;
44 };
45 template<class... AN>
46 using buffer_with_time_or_count_invalid_t = typename buffer_with_time_or_count_invalid<AN...>::type;
47 
48 template<class T, class Duration, class Coordination>
50 {
51  typedef rxu::decay_t<T> source_value_type;
52  typedef std::vector<source_value_type> value_type;
53  typedef rxu::decay_t<Coordination> coordination_type;
54  typedef typename coordination_type::coordinator_type coordinator_type;
55  typedef rxu::decay_t<Duration> duration_type;
56 
57  struct buffer_with_time_or_count_values
58  {
59  buffer_with_time_or_count_values(duration_type p, int n, coordination_type c)
60  : period(p)
61  , count(n)
62  , coordination(c)
63  {
64  }
65  duration_type period;
66  int count;
67  coordination_type coordination;
68  };
69  buffer_with_time_or_count_values initial;
70 
71  buffer_with_time_or_count(duration_type period, int count, coordination_type coordination)
72  : initial(period, count, coordination)
73  {
74  }
75 
76  template<class Subscriber>
77  struct buffer_with_time_or_count_observer
78  {
79  typedef buffer_with_time_or_count_observer<Subscriber> this_type;
80  typedef std::vector<T> value_type;
81  typedef rxu::decay_t<Subscriber> dest_type;
82  typedef observer<value_type, this_type> observer_type;
83 
84  struct buffer_with_time_or_count_subscriber_values : public buffer_with_time_or_count_values
85  {
86  buffer_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
87  : buffer_with_time_or_count_values(std::move(v))
88  , cs(std::move(cs))
89  , dest(std::move(d))
90  , coordinator(std::move(c))
91  , worker(coordinator.get_worker())
92  , chunk_id(0)
93  {
94  }
95  composite_subscription cs;
96  dest_type dest;
97  coordinator_type coordinator;
98  rxsc::worker worker;
99  mutable int chunk_id;
100  mutable value_type chunk;
101  };
102  typedef std::shared_ptr<buffer_with_time_or_count_subscriber_values> state_type;
103  state_type state;
104 
105  buffer_with_time_or_count_observer(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
106  : state(std::make_shared<buffer_with_time_or_count_subscriber_values>(buffer_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
107  {
108  auto new_id = state->chunk_id;
109  auto produce_time = state->worker.now() + state->period;
110  auto localState = state;
111 
112  auto disposer = [=](const rxsc::schedulable&){
113  localState->cs.unsubscribe();
114  localState->dest.unsubscribe();
115  localState->worker.unsubscribe();
116  };
117  auto selectedDisposer = on_exception(
118  [&](){return localState->coordinator.act(disposer);},
119  localState->dest);
120  if (selectedDisposer.empty()) {
121  return;
122  }
123 
124  localState->dest.add([=](){
125  localState->worker.schedule(selectedDisposer.get());
126  });
127  localState->cs.add([=](){
128  localState->worker.schedule(selectedDisposer.get());
129  });
130 
131  //
132  // The scheduler is FIFO for any time T. Since the observer is scheduling
133  // on_next/on_error/oncompleted the timed schedule calls must be resheduled
134  // when they occur to ensure that production happens after on_next/on_error/oncompleted
135  //
136 
137  localState->worker.schedule(produce_time, [new_id, produce_time, localState](const rxsc::schedulable&){
138  localState->worker.schedule(produce_buffer(new_id, produce_time, localState));
139  });
140  }
141 
142  static std::function<void(const rxsc::schedulable&)> produce_buffer(int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
143  auto produce = [id, expected, state](const rxsc::schedulable&) {
144  if (id != state->chunk_id)
145  return;
146 
147  state->dest.on_next(state->chunk);
148  state->chunk.resize(0);
149  auto new_id = ++state->chunk_id;
150  auto produce_time = expected + state->period;
151  state->worker.schedule(produce_time, [new_id, produce_time, state](const rxsc::schedulable&){
152  state->worker.schedule(produce_buffer(new_id, produce_time, state));
153  });
154  };
155 
156  auto selectedProduce = on_exception(
157  [&](){return state->coordinator.act(produce);},
158  state->dest);
159  if (selectedProduce.empty()) {
160  return std::function<void(const rxsc::schedulable&)>();
161  }
162 
163  return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
164  }
165 
166  void on_next(T v) const {
167  auto localState = state;
168  auto work = [v, localState](const rxsc::schedulable& self){
169  localState->chunk.push_back(v);
170  if (int(localState->chunk.size()) == localState->count) {
171  produce_buffer(localState->chunk_id, localState->worker.now(), localState)(self);
172  }
173  };
174  auto selectedWork = on_exception(
175  [&](){return localState->coordinator.act(work);},
176  localState->dest);
177  if (selectedWork.empty()) {
178  return;
179  }
180  localState->worker.schedule(selectedWork.get());
181  }
182  void on_error(std::exception_ptr e) const {
183  auto localState = state;
184  auto work = [e, localState](const rxsc::schedulable&){
185  localState->dest.on_error(e);
186  };
187  auto selectedWork = on_exception(
188  [&](){return localState->coordinator.act(work);},
189  localState->dest);
190  if (selectedWork.empty()) {
191  return;
192  }
193  localState->worker.schedule(selectedWork.get());
194  }
195  void on_completed() const {
196  auto localState = state;
197  auto work = [localState](const rxsc::schedulable&){
198  localState->dest.on_next(localState->chunk);
199  localState->dest.on_completed();
200  };
201  auto selectedWork = on_exception(
202  [&](){return localState->coordinator.act(work);},
203  localState->dest);
204  if (selectedWork.empty()) {
205  return;
206  }
207  localState->worker.schedule(selectedWork.get());
208  }
209 
210  static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_or_count_values v) {
211  auto cs = composite_subscription();
212  auto coordinator = v.coordination.create_coordinator();
213 
214  return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
215  }
216  };
217 
218  template<class Subscriber>
219  auto operator()(Subscriber dest) const
220  -> decltype(buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
221  return buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
222  }
223 };
224 
225 }
226 
229 template<class... AN>
232  return operator_factory<buffer_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
233 }
234 
235 }
236 
237 template<>
239 {
240  template<class Observable, class Duration,
241  class Enabled = rxu::enable_if_all_true_type_t<
243  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
244  class SourceValue = rxu::value_type_t<Observable>,
245  class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
247  static auto member(Observable&& o, Duration&& period, int count)
248  -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
249  return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
250  }
251 
252  template<class Observable, class Duration, class Coordination,
253  class Enabled = rxu::enable_if_all_true_type_t<
254  is_observable<Observable>,
255  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
257  class SourceValue = rxu::value_type_t<Observable>,
258  class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
260  static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
261  -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
262  return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
263  }
264 
265  template<class... AN>
266  static operators::detail::buffer_with_time_or_count_invalid_t<AN...> member(AN...) {
267  std::terminate();
268  return {};
269  static_assert(sizeof...(AN) == 10000, "buffer_with_time_or_count takes (Duration, Count, optional Coordination)");
270  }
271 };
272 
273 }
274 
275 #endif
Definition: rx-operators.hpp:143
static operators::detail::buffer_with_time_or_count_invalid_t< AN... > member(AN...)
Definition: rx-buffer_time_count.hpp:266
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static auto member(Observable &&o, Duration &&period, int count) -> decltype(o.template lift< Value >(BufferTimeCount(std::forward< Duration >(period), count, identity_current_thread())))
Definition: rx-buffer_time_count.hpp:247
Definition: rx-operators.hpp:69
static auto member(Observable &&o, Duration &&period, int count, Coordination &&cn) -> decltype(o.template lift< Value >(BufferTimeCount(std::forward< Duration >(period), count, std::forward< Coordination >(cn))))
Definition: rx-buffer_time_count.hpp:260
auto AN
Definition: rx-finally.hpp:105
auto buffer_with_time_or_count(AN &&...an) -> operator_factory< buffer_with_time_or_count_tag, AN... >
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-buffer_time_count.hpp:230
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37