RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-window_time_count.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
27 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP)
28 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP
29 
30 #include "../rx-includes.hpp"
31 
32 namespace rxcpp {
33 
34 namespace operators {
35 
36 namespace detail {
37 
38 template<class... AN>
39 struct window_with_time_or_count_invalid_arguments {};
40 
41 template<class... AN>
42 struct window_with_time_or_count_invalid : public rxo::operator_base<window_with_time_or_count_invalid_arguments<AN...>> {
43  using type = observable<window_with_time_or_count_invalid_arguments<AN...>, window_with_time_or_count_invalid<AN...>>;
44 };
45 template<class... AN>
46 using window_with_time_or_count_invalid_t = typename window_with_time_or_count_invalid<AN...>::type;
47 
48 template<class T, class Duration, class Coordination>
50 {
51  typedef rxu::decay_t<T> source_value_type;
52  typedef observable<source_value_type> value_type;
53  typedef rxu::decay_t<Coordination> coordination_type;
54  typedef typename coordination_type::coordinator_type coordinator_type;
55  typedef rxu::decay_t<Duration> duration_type;
56 
57  struct window_with_time_or_count_values
58  {
59  window_with_time_or_count_values(duration_type p, int n, coordination_type c)
60  : period(p)
61  , count(n)
62  , coordination(c)
63  {
64  }
65  duration_type period;
66  int count;
67  coordination_type coordination;
68  };
69  window_with_time_or_count_values initial;
70 
71  window_with_time_or_count(duration_type period, int count, coordination_type coordination)
72  : initial(period, count, coordination)
73  {
74  }
75 
76  template<class Subscriber>
77  struct window_with_time_or_count_observer
78  {
79  typedef window_with_time_or_count_observer<Subscriber> this_type;
80  typedef rxu::decay_t<T> value_type;
81  typedef rxu::decay_t<Subscriber> dest_type;
82  typedef observer<T, this_type> observer_type;
83 
84  struct window_with_time_or_count_subscriber_values : public window_with_time_or_count_values
85  {
86  window_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
87  : window_with_time_or_count_values(std::move(v))
88  , cs(std::move(cs))
89  , dest(std::move(d))
90  , coordinator(std::move(c))
91  , worker(coordinator.get_worker())
92  , cursor(0)
93  , subj_id(0)
94  {
95  }
96  composite_subscription cs;
97  dest_type dest;
98  coordinator_type coordinator;
99  rxsc::worker worker;
100  mutable int cursor;
101  mutable int subj_id;
102  mutable rxcpp::subjects::subject<T> subj;
103  };
104  typedef std::shared_ptr<window_with_time_or_count_subscriber_values> state_type;
105  state_type state;
106 
107  window_with_time_or_count_observer(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
108  : state(std::make_shared<window_with_time_or_count_subscriber_values>(window_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
109  {
110  auto new_id = state->subj_id;
111  auto produce_time = state->worker.now();
112  auto localState = state;
113 
114  auto disposer = [=](const rxsc::schedulable&){
115  localState->cs.unsubscribe();
116  localState->dest.unsubscribe();
117  localState->worker.unsubscribe();
118  };
119  auto selectedDisposer = on_exception(
120  [&](){return localState->coordinator.act(disposer);},
121  localState->dest);
122  if (selectedDisposer.empty()) {
123  return;
124  }
125 
126  localState->dest.add([=](){
127  localState->worker.schedule(selectedDisposer.get());
128  });
129  localState->cs.add([=](){
130  localState->worker.schedule(selectedDisposer.get());
131  });
132 
133  //
134  // The scheduler is FIFO for any time T. Since the observer is scheduling
135  // on_next/on_error/oncompleted the timed schedule calls must be resheduled
136  // when they occur to ensure that production happens after on_next/on_error/oncompleted
137  //
138 
139  localState->worker.schedule(produce_time, [new_id, produce_time, localState](const rxsc::schedulable&){
140  localState->worker.schedule(release_window(new_id, produce_time, localState));
141  });
142  }
143 
144  static std::function<void(const rxsc::schedulable&)> release_window(int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
145  auto release = [id, expected, state](const rxsc::schedulable&) {
146  if (id != state->subj_id)
147  return;
148 
149  state->subj.get_subscriber().on_completed();
150  state->subj = rxcpp::subjects::subject<T>();
151  state->dest.on_next(state->subj.get_observable().as_dynamic());
152  state->cursor = 0;
153  auto new_id = ++state->subj_id;
154  auto produce_time = expected + state->period;
155  state->worker.schedule(produce_time, [new_id, produce_time, state](const rxsc::schedulable&){
156  state->worker.schedule(release_window(new_id, produce_time, state));
157  });
158  };
159  auto selectedRelease = on_exception(
160  [&](){return state->coordinator.act(release);},
161  state->dest);
162  if (selectedRelease.empty()) {
163  return std::function<void(const rxsc::schedulable&)>();
164  }
165 
166  return std::function<void(const rxsc::schedulable&)>(selectedRelease.get());
167  }
168 
169  void on_next(T v) const {
170  auto localState = state;
171  auto work = [v, localState](const rxsc::schedulable& self){
172  localState->subj.get_subscriber().on_next(v);
173  if (++localState->cursor == localState->count) {
174  release_window(localState->subj_id, localState->worker.now(), localState)(self);
175  }
176  };
177  auto selectedWork = on_exception(
178  [&](){return localState->coordinator.act(work);},
179  localState->dest);
180  if (selectedWork.empty()) {
181  return;
182  }
183  localState->worker.schedule(selectedWork.get());
184  }
185 
186  void on_error(std::exception_ptr e) const {
187  auto localState = state;
188  auto work = [e, localState](const rxsc::schedulable&){
189  localState->subj.get_subscriber().on_error(e);
190  localState->dest.on_error(e);
191  };
192  auto selectedWork = on_exception(
193  [&](){return localState->coordinator.act(work);},
194  localState->dest);
195  if (selectedWork.empty()) {
196  return;
197  }
198  localState->worker.schedule(selectedWork.get());
199  }
200 
201  void on_completed() const {
202  auto localState = state;
203  auto work = [localState](const rxsc::schedulable&){
204  localState->subj.get_subscriber().on_completed();
205  localState->dest.on_completed();
206  };
207  auto selectedWork = on_exception(
208  [&](){return localState->coordinator.act(work);},
209  localState->dest);
210  if (selectedWork.empty()) {
211  return;
212  }
213  localState->worker.schedule(selectedWork.get());
214  }
215 
216  static subscriber<T, observer_type> make(dest_type d, window_with_time_or_count_values v) {
217  auto cs = composite_subscription();
218  auto coordinator = v.coordination.create_coordinator();
219 
220  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
221  }
222  };
223 
224  template<class Subscriber>
225  auto operator()(Subscriber dest) const
226  -> decltype(window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
227  return window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
228  }
229 };
230 
231 }
232 
235 template<class... AN>
238  return operator_factory<window_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
239 }
240 
241 }
242 
243 template<>
245 {
246  template<class Observable, class Duration,
247  class Enabled = rxu::enable_if_all_true_type_t<
249  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
250  class SourceValue = rxu::value_type_t<Observable>,
251  class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
253  static auto member(Observable&& o, Duration&& period, int count)
254  -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
255  return o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
256  }
257 
258  template<class Observable, class Duration, class Coordination,
259  class Enabled = rxu::enable_if_all_true_type_t<
260  is_observable<Observable>,
261  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
263  class SourceValue = rxu::value_type_t<Observable>,
264  class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
266  static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
267  -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
268  return o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
269  }
270 
271  template<class... AN>
272  static operators::detail::window_with_time_or_count_invalid_t<AN...> member(AN...) {
273  std::terminate();
274  return {};
275  static_assert(sizeof...(AN) == 10000, "window_with_time_or_count takes (Duration, Count, optional Coordination)");
276  }
277 };
278 
279 }
280 
281 #endif
static operators::detail::window_with_time_or_count_invalid_t< AN... > member(AN...)
Definition: rx-window_time_count.hpp:272
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto window_with_time_or_count(AN &&...an) -> operator_factory< window_with_time_or_count_tag, AN... >
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-window_time_count.hpp:236
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
Definition: rx-operators.hpp:487
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration &&period, int count, Coordination &&cn) -> decltype(o.template lift< Value >(WindowTimeCount(std::forward< Duration >(period), count, std::forward< Coordination >(cn))))
Definition: rx-window_time_count.hpp:266
static auto member(Observable &&o, Duration &&period, int count) -> decltype(o.template lift< Value >(WindowTimeCount(std::forward< Duration >(period), count, identity_current_thread())))
Definition: rx-window_time_count.hpp:253
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37