RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-window_time.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
37 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP)
38 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP
39 
40 #include "../rx-includes.hpp"
41 
42 namespace rxcpp {
43 
44 namespace operators {
45 
46 namespace detail {
47 
48 template<class... AN>
49 struct window_with_time_invalid_arguments {};
50 
51 template<class... AN>
52 struct window_with_time_invalid : public rxo::operator_base<window_with_time_invalid_arguments<AN...>> {
53  using type = observable<window_with_time_invalid_arguments<AN...>, window_with_time_invalid<AN...>>;
54 };
55 template<class... AN>
56 using window_with_time_invalid_t = typename window_with_time_invalid<AN...>::type;
57 
58 template<class T, class Duration, class Coordination>
59 struct window_with_time
60 {
61  typedef rxu::decay_t<T> source_value_type;
62  typedef observable<source_value_type> value_type;
63  typedef rxu::decay_t<Coordination> coordination_type;
64  typedef typename coordination_type::coordinator_type coordinator_type;
65  typedef rxu::decay_t<Duration> duration_type;
66 
67  struct window_with_time_values
68  {
69  window_with_time_values(duration_type p, duration_type s, coordination_type c)
70  : period(p)
71  , skip(s)
72  , coordination(c)
73  {
74  }
75  duration_type period;
76  duration_type skip;
77  coordination_type coordination;
78  };
79  window_with_time_values initial;
80 
81  window_with_time(duration_type period, duration_type skip, coordination_type coordination)
82  : initial(period, skip, coordination)
83  {
84  }
85 
86  template<class Subscriber>
87  struct window_with_time_observer
88  {
89  typedef window_with_time_observer<Subscriber> this_type;
90  typedef rxu::decay_t<T> value_type;
91  typedef rxu::decay_t<Subscriber> dest_type;
92  typedef observer<T, this_type> observer_type;
93 
94  struct window_with_time_subscriber_values : public window_with_time_values
95  {
96  window_with_time_subscriber_values(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
97  : window_with_time_values(v)
98  , cs(std::move(cs))
99  , dest(std::move(d))
100  , coordinator(std::move(c))
101  , worker(coordinator.get_worker())
102  , expected(worker.now())
103  {
104  }
105  composite_subscription cs;
106  dest_type dest;
107  coordinator_type coordinator;
108  rxsc::worker worker;
109  mutable std::deque<rxcpp::subjects::subject<T>> subj;
110  rxsc::scheduler::clock_type::time_point expected;
111  };
112  std::shared_ptr<window_with_time_subscriber_values> state;
113 
114  window_with_time_observer(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
115  : state(std::make_shared<window_with_time_subscriber_values>(window_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
116  {
117  auto localState = state;
118 
119  auto disposer = [=](const rxsc::schedulable&){
120  localState->cs.unsubscribe();
121  localState->dest.unsubscribe();
122  localState->worker.unsubscribe();
123  };
124  auto selectedDisposer = on_exception(
125  [&](){return localState->coordinator.act(disposer);},
126  localState->dest);
127  if (selectedDisposer.empty()) {
128  return;
129  }
130 
131  localState->dest.add([=](){
132  localState->worker.schedule(selectedDisposer.get());
133  });
134  localState->cs.add([=](){
135  localState->worker.schedule(selectedDisposer.get());
136  });
137 
138  //
139  // The scheduler is FIFO for any time T. Since the observer is scheduling
140  // on_next/on_error/oncompleted the timed schedule calls must be resheduled
141  // when they occur to ensure that production happens after on_next/on_error/oncompleted
142  //
143 
144  auto release_window = [localState](const rxsc::schedulable&) {
145  localState->worker.schedule([localState](const rxsc::schedulable&) {
146  localState->subj[0].get_subscriber().on_completed();
147  localState->subj.pop_front();
148  });
149  };
150  auto selectedRelease = on_exception(
151  [&](){return localState->coordinator.act(release_window);},
152  localState->dest);
153  if (selectedRelease.empty()) {
154  return;
155  }
156 
157  auto create_window = [localState, selectedRelease](const rxsc::schedulable&) {
158  localState->subj.push_back(rxcpp::subjects::subject<T>());
159  localState->dest.on_next(localState->subj[localState->subj.size() - 1].get_observable().as_dynamic());
160 
161  auto produce_at = localState->expected + localState->period;
162  localState->expected += localState->skip;
163  localState->worker.schedule(produce_at, [localState, selectedRelease](const rxsc::schedulable&) {
164  localState->worker.schedule(selectedRelease.get());
165  });
166  };
167  auto selectedCreate = on_exception(
168  [&](){return localState->coordinator.act(create_window);},
169  localState->dest);
170  if (selectedCreate.empty()) {
171  return;
172  }
173 
174  state->worker.schedule_periodically(
175  state->expected,
176  state->skip,
177  [localState, selectedCreate](const rxsc::schedulable&) {
178  localState->worker.schedule(selectedCreate.get());
179  });
180  }
181 
182  void on_next(T v) const {
183  auto localState = state;
184  auto work = [v, localState](const rxsc::schedulable&){
185  for (auto s : localState->subj) {
186  s.get_subscriber().on_next(v);
187  }
188  };
189  auto selectedWork = on_exception(
190  [&](){return localState->coordinator.act(work);},
191  localState->dest);
192  if (selectedWork.empty()) {
193  return;
194  }
195  localState->worker.schedule(selectedWork.get());
196  }
197 
198  void on_error(std::exception_ptr e) const {
199  auto localState = state;
200  auto work = [e, localState](const rxsc::schedulable&){
201  for (auto s : localState->subj) {
202  s.get_subscriber().on_error(e);
203  }
204  localState->dest.on_error(e);
205  };
206  auto selectedWork = on_exception(
207  [&](){return localState->coordinator.act(work);},
208  localState->dest);
209  if (selectedWork.empty()) {
210  return;
211  }
212  localState->worker.schedule(selectedWork.get());
213  }
214 
215  void on_completed() const {
216  auto localState = state;
217  auto work = [localState](const rxsc::schedulable&){
218  for (auto s : localState->subj) {
219  s.get_subscriber().on_completed();
220  }
221  localState->dest.on_completed();
222  };
223  auto selectedWork = on_exception(
224  [&](){return localState->coordinator.act(work);},
225  localState->dest);
226  if (selectedWork.empty()) {
227  return;
228  }
229  localState->worker.schedule(selectedWork.get());
230  }
231 
232  static subscriber<T, observer_type> make(dest_type d, window_with_time_values v) {
233  auto cs = composite_subscription();
234  auto coordinator = v.coordination.create_coordinator();
235 
236  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
237  }
238  };
239 
240  template<class Subscriber>
241  auto operator()(Subscriber dest) const
242  -> decltype(window_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
243  return window_with_time_observer<Subscriber>::make(std::move(dest), initial);
244  }
245 };
246 
247 }
248 
251 template<class... AN>
252 auto window_with_time(AN&&... an)
254  return operator_factory<window_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
255 }
256 
257 }
258 
259 template<>
261 {
262  template<class Observable, class Duration,
263  class Enabled = rxu::enable_if_all_true_type_t<
265  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
266  class SourceValue = rxu::value_type_t<Observable>,
267  class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
268  class Value = rxu::value_type_t<WindowWithTime>>
269  static auto member(Observable&& o, Duration period)
270  -> decltype(o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()))) {
271  return o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()));
272  }
273 
274  template<class Observable, class Duration, class Coordination,
275  class Enabled = rxu::enable_if_all_true_type_t<
276  is_observable<Observable>,
277  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
279  class SourceValue = rxu::value_type_t<Observable>,
280  class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
281  class Value = rxu::value_type_t<WindowWithTime>>
282  static auto member(Observable&& o, Duration period, Coordination&& cn)
283  -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) {
284  return o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)));
285  }
286 
287  template<class Observable, class Duration,
288  class Enabled = rxu::enable_if_all_true_type_t<
289  is_observable<Observable>,
290  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
291  class SourceValue = rxu::value_type_t<Observable>,
292  class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
293  class Value = rxu::value_type_t<WindowWithTime>>
294  static auto member(Observable&& o, Duration&& period, Duration&& skip)
295  -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
296  return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
297  }
298 
299  template<class Observable, class Duration, class Coordination,
300  class Enabled = rxu::enable_if_all_true_type_t<
301  is_observable<Observable>,
302  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
303  is_coordination<Coordination>>,
304  class SourceValue = rxu::value_type_t<Observable>,
305  class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
306  class Value = rxu::value_type_t<WindowWithTime>>
307  static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
308  -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
309  return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
310  }
311 
312  template<class... AN>
313  static operators::detail::window_with_time_invalid_t<AN...> member(AN...) {
314  std::terminate();
315  return {};
316  static_assert(sizeof...(AN) == 10000, "window_with_time takes (Duration, optional Duration, optional Coordination)");
317  }
318 };
319 
320 }
321 
322 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, Duration &&period, Duration &&skip, Coordination &&cn) -> decltype(o.template lift< Value >(WindowWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), std::forward< Coordination >(cn))))
Definition: rx-window_time.hpp:307
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration period, Coordination &&cn) -> decltype(o.template lift< Value >(WindowWithTime(period, period, std::forward< Coordination >(cn))))
Definition: rx-window_time.hpp:282
static operators::detail::window_with_time_invalid_t< AN... > member(AN...)
Definition: rx-window_time.hpp:313
Definition: rx-operators.hpp:480
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
static auto member(Observable &&o, Duration period) -> decltype(o.template lift< Value >(WindowWithTime(period, period, identity_current_thread())))
Definition: rx-window_time.hpp:269
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto window_with_time(AN &&...an) -> operator_factory< window_with_time_tag, AN... >
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-window_time.hpp:252
Definition: rx-coordination.hpp:114
static auto member(Observable &&o, Duration &&period, Duration &&skip) -> decltype(o.template lift< Value >(WindowWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), identity_current_thread())))
Definition: rx-window_time.hpp:294
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37