RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-buffer_time.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
45 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP)
46 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP
47 
48 #include "../rx-includes.hpp"
49 
50 namespace rxcpp {
51 
52 namespace operators {
53 
54 namespace detail {
55 
56 template<class... AN>
57 struct buffer_with_time_invalid_arguments {};
58 
59 template<class... AN>
60 struct buffer_with_time_invalid : public rxo::operator_base<buffer_with_time_invalid_arguments<AN...>> {
61  using type = observable<buffer_with_time_invalid_arguments<AN...>, buffer_with_time_invalid<AN...>>;
62 };
63 template<class... AN>
64 using buffer_with_time_invalid_t = typename buffer_with_time_invalid<AN...>::type;
65 
66 template<class T, class Duration, class Coordination>
67 struct buffer_with_time
68 {
69  typedef rxu::decay_t<T> source_value_type;
70  typedef std::vector<source_value_type> value_type;
71  typedef rxu::decay_t<Coordination> coordination_type;
72  typedef typename coordination_type::coordinator_type coordinator_type;
73  typedef rxu::decay_t<Duration> duration_type;
74 
75  struct buffer_with_time_values
76  {
77  buffer_with_time_values(duration_type p, duration_type s, coordination_type c)
78  : period(p)
79  , skip(s)
80  , coordination(c)
81  {
82  }
83  duration_type period;
84  duration_type skip;
85  coordination_type coordination;
86  };
87  buffer_with_time_values initial;
88 
89  buffer_with_time(duration_type period, duration_type skip, coordination_type coordination)
90  : initial(period, skip, coordination)
91  {
92  }
93 
94  template<class Subscriber>
95  struct buffer_with_time_observer
96  {
97  typedef buffer_with_time_observer<Subscriber> this_type;
98  typedef std::vector<T> value_type;
99  typedef rxu::decay_t<Subscriber> dest_type;
100  typedef observer<value_type, this_type> observer_type;
101 
102  struct buffer_with_time_subscriber_values : public buffer_with_time_values
103  {
104  buffer_with_time_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
105  : buffer_with_time_values(v)
106  , cs(std::move(cs))
107  , dest(std::move(d))
108  , coordinator(std::move(c))
109  , worker(coordinator.get_worker())
110  , expected(worker.now())
111  {
112  }
113  composite_subscription cs;
114  dest_type dest;
115  coordinator_type coordinator;
116  rxsc::worker worker;
117  mutable std::deque<value_type> chunks;
118  rxsc::scheduler::clock_type::time_point expected;
119  };
120  std::shared_ptr<buffer_with_time_subscriber_values> state;
121 
122  buffer_with_time_observer(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
123  : state(std::make_shared<buffer_with_time_subscriber_values>(buffer_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
124  {
125  auto localState = state;
126 
127  auto disposer = [=](const rxsc::schedulable&){
128  localState->cs.unsubscribe();
129  localState->dest.unsubscribe();
130  localState->worker.unsubscribe();
131  };
132  auto selectedDisposer = on_exception(
133  [&](){return localState->coordinator.act(disposer);},
134  localState->dest);
135  if (selectedDisposer.empty()) {
136  return;
137  }
138 
139  localState->dest.add([=](){
140  localState->worker.schedule(selectedDisposer.get());
141  });
142  localState->cs.add([=](){
143  localState->worker.schedule(selectedDisposer.get());
144  });
145 
146  //
147  // The scheduler is FIFO for any time T. Since the observer is scheduling
148  // on_next/on_error/oncompleted the timed schedule calls must be resheduled
149  // when they occur to ensure that production happens after on_next/on_error/oncompleted
150  //
151 
152  auto produce_buffer = [localState](const rxsc::schedulable&) {
153  localState->dest.on_next(std::move(localState->chunks.front()));
154  localState->chunks.pop_front();
155  };
156  auto selectedProduce = on_exception(
157  [&](){return localState->coordinator.act(produce_buffer);},
158  localState->dest);
159  if (selectedProduce.empty()) {
160  return;
161  }
162 
163  auto create_buffer = [localState, selectedProduce](const rxsc::schedulable&) {
164  localState->chunks.emplace_back();
165  auto produce_at = localState->expected + localState->period;
166  localState->expected += localState->skip;
167  localState->worker.schedule(produce_at, [localState, selectedProduce](const rxsc::schedulable&) {
168  localState->worker.schedule(selectedProduce.get());
169  });
170  };
171  auto selectedCreate = on_exception(
172  [&](){return localState->coordinator.act(create_buffer);},
173  localState->dest);
174  if (selectedCreate.empty()) {
175  return;
176  }
177 
178  state->worker.schedule_periodically(
179  state->expected,
180  state->skip,
181  [localState, selectedCreate](const rxsc::schedulable&) {
182  localState->worker.schedule(selectedCreate.get());
183  });
184  }
185  void on_next(T v) const {
186  auto localState = state;
187  auto work = [v, localState](const rxsc::schedulable&){
188  for(auto& chunk : localState->chunks) {
189  chunk.push_back(v);
190  }
191  };
192  auto selectedWork = on_exception(
193  [&](){return localState->coordinator.act(work);},
194  localState->dest);
195  if (selectedWork.empty()) {
196  return;
197  }
198  localState->worker.schedule(selectedWork.get());
199  }
200  void on_error(std::exception_ptr e) const {
201  auto localState = state;
202  auto work = [e, localState](const rxsc::schedulable&){
203  localState->dest.on_error(e);
204  };
205  auto selectedWork = on_exception(
206  [&](){return localState->coordinator.act(work);},
207  localState->dest);
208  if (selectedWork.empty()) {
209  return;
210  }
211  localState->worker.schedule(selectedWork.get());
212  }
213  void on_completed() const {
214  auto localState = state;
215  auto work = [localState](const rxsc::schedulable&){
216  on_exception(
217  [&](){
218  while (!localState->chunks.empty()) {
219  localState->dest.on_next(std::move(localState->chunks.front()));
220  localState->chunks.pop_front();
221  }
222  return true;
223  },
224  localState->dest);
225  localState->dest.on_completed();
226  };
227  auto selectedWork = on_exception(
228  [&](){return localState->coordinator.act(work);},
229  localState->dest);
230  if (selectedWork.empty()) {
231  return;
232  }
233  localState->worker.schedule(selectedWork.get());
234  }
235 
236  static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_values v) {
237  auto cs = composite_subscription();
238  auto coordinator = v.coordination.create_coordinator();
239 
240  return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
241  }
242  };
243 
244  template<class Subscriber>
245  auto operator()(Subscriber dest) const
246  -> decltype(buffer_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
247  return buffer_with_time_observer<Subscriber>::make(std::move(dest), initial);
248  }
249 };
250 
251 }
252 
255 template<class... AN>
256 auto buffer_with_time(AN&&... an)
258  return operator_factory<buffer_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
259 }
260 
261 }
262 
263 template<>
265 {
266  template<class Observable, class Duration,
267  class Enabled = rxu::enable_if_all_true_type_t<
269  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
270  class SourceValue = rxu::value_type_t<Observable>,
271  class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
272  class Value = rxu::value_type_t<BufferWithTime>>
273  static auto member(Observable&& o, Duration period)
274  -> decltype(o.template lift<Value>(BufferWithTime(period, period, identity_current_thread()))) {
275  return o.template lift<Value>(BufferWithTime(period, period, identity_current_thread()));
276  }
277 
278  template<class Observable, class Duration, class Coordination,
279  class Enabled = rxu::enable_if_all_true_type_t<
280  is_observable<Observable>,
281  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
283  class SourceValue = rxu::value_type_t<Observable>,
284  class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
285  class Value = rxu::value_type_t<BufferWithTime>>
286  static auto member(Observable&& o, Duration period, Coordination&& cn)
287  -> decltype(o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)))) {
288  return o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)));
289  }
290 
291  template<class Observable, class Duration,
292  class Enabled = rxu::enable_if_all_true_type_t<
293  is_observable<Observable>,
294  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
295  class SourceValue = rxu::value_type_t<Observable>,
296  class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
297  class Value = rxu::value_type_t<BufferWithTime>>
298  static auto member(Observable&& o, Duration&& period, Duration&& skip)
299  -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
300  return o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
301  }
302 
303  template<class Observable, class Duration, class Coordination,
304  class Enabled = rxu::enable_if_all_true_type_t<
305  is_observable<Observable>,
306  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
307  is_coordination<Coordination>>,
308  class SourceValue = rxu::value_type_t<Observable>,
309  class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
310  class Value = rxu::value_type_t<BufferWithTime>>
311  static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
312  -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
313  return o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
314  }
315 
316  template<class... AN>
317  static operators::detail::buffer_with_time_invalid_t<AN...> member(AN...) {
318  std::terminate();
319  return {};
320  static_assert(sizeof...(AN) == 10000, "buffer_with_time takes (Duration, optional Duration, optional Coordination)");
321  }
322 };
323 
324 }
325 
326 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static auto member(Observable &&o, Duration period, Coordination &&cn) -> decltype(o.template lift< Value >(BufferWithTime(period, period, std::forward< Coordination >(cn))))
Definition: rx-buffer_time.hpp:286
Definition: rx-operators.hpp:69
static auto member(Observable &&o, Duration &&period, Duration &&skip, Coordination &&cn) -> decltype(o.template lift< Value >(BufferWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), std::forward< Coordination >(cn))))
Definition: rx-buffer_time.hpp:311
auto buffer_with_time(AN &&...an) -> operator_factory< buffer_with_time_tag, AN... >
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-buffer_time.hpp:256
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration &&period, Duration &&skip) -> decltype(o.template lift< Value >(BufferWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), identity_current_thread())))
Definition: rx-buffer_time.hpp:298
Definition: rx-operators.hpp:136
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
static auto member(Observable &&o, Duration period) -> decltype(o.template lift< Value >(BufferWithTime(period, period, identity_current_thread())))
Definition: rx-buffer_time.hpp:273
static operators::detail::buffer_with_time_invalid_t< AN... > member(AN...)
Definition: rx-buffer_time.hpp:317
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37