RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-window.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
25 #if !defined(RXCPP_OPERATORS_RX_WINDOW_HPP)
26 #define RXCPP_OPERATORS_RX_WINDOW_HPP
27 
28 #include "../rx-includes.hpp"
29 
30 namespace rxcpp {
31 
32 namespace operators {
33 
34 namespace detail {
35 
36 template<class... AN>
37 struct window_invalid_arguments {};
38 
39 template<class... AN>
40 struct window_invalid : public rxo::operator_base<window_invalid_arguments<AN...>> {
41  using type = observable<window_invalid_arguments<AN...>, window_invalid<AN...>>;
42 };
43 template<class... AN>
44 using window_invalid_t = typename window_invalid<AN...>::type;
45 
46 template<class T>
47 struct window
48 {
49  typedef rxu::decay_t<T> source_value_type;
50  typedef observable<source_value_type> value_type;
51 
52  struct window_values
53  {
54  window_values(int c, int s)
55  : count(c)
56  , skip(s)
57  {
58  }
59  int count;
60  int skip;
61  };
62 
63  window_values initial;
64 
65  window(int count, int skip)
66  : initial(count, skip)
67  {
68  }
69 
70  template<class Subscriber>
71  struct window_observer : public window_values
72  {
73  typedef window_observer<Subscriber> this_type;
74  typedef rxu::decay_t<T> value_type;
75  typedef rxu::decay_t<Subscriber> dest_type;
76  typedef observer<T, this_type> observer_type;
77  dest_type dest;
78  mutable int cursor;
79  mutable std::deque<rxcpp::subjects::subject<T>> subj;
80 
81  window_observer(dest_type d, window_values v)
82  : window_values(v)
83  , dest(std::move(d))
84  , cursor(0)
85  {
86  subj.push_back(rxcpp::subjects::subject<T>());
87  dest.on_next(subj[0].get_observable().as_dynamic());
88  }
89  void on_next(T v) const {
90  for (auto s : subj) {
91  s.get_subscriber().on_next(v);
92  }
93 
94  int c = cursor - this->count + 1;
95  if (c >= 0 && c % this->skip == 0) {
96  subj[0].get_subscriber().on_completed();
97  subj.pop_front();
98  }
99 
100  if (++cursor % this->skip == 0) {
101  subj.push_back(rxcpp::subjects::subject<T>());
102  dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic());
103  }
104  }
105 
106  void on_error(std::exception_ptr e) const {
107  for (auto s : subj) {
108  s.get_subscriber().on_error(e);
109  }
110  dest.on_error(e);
111  }
112 
113  void on_completed() const {
114  for (auto s : subj) {
115  s.get_subscriber().on_completed();
116  }
117  dest.on_completed();
118  }
119 
120  static subscriber<T, observer_type> make(dest_type d, window_values v) {
121  auto cs = d.get_subscription();
122  return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v))));
123  }
124  };
125 
126  template<class Subscriber>
127  auto operator()(Subscriber dest) const
128  -> decltype(window_observer<Subscriber>::make(std::move(dest), initial)) {
129  return window_observer<Subscriber>::make(std::move(dest), initial);
130  }
131 };
132 
133 }
134 
137 template<class... AN>
138 auto window(AN&&... an)
139  -> operator_factory<window_tag, AN...> {
140  return operator_factory<window_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
141 }
142 
143 }
144 
145 template<>
147 {
148  template<class Observable,
149  class Enabled = rxu::enable_if_all_true_type_t<
151  class SourceValue = rxu::value_type_t<Observable>,
152  class Window = rxo::detail::window<SourceValue>,
153  class Value = rxu::value_type_t<Window>>
154  static auto member(Observable&& o, int count, int skip)
155  -> decltype(o.template lift<Value>(Window(count, skip))) {
156  return o.template lift<Value>(Window(count, skip));
157  }
158 
159  template<class Observable,
160  class Enabled = rxu::enable_if_all_true_type_t<
161  is_observable<Observable>>,
162  class SourceValue = rxu::value_type_t<Observable>,
163  class Window = rxo::detail::window<SourceValue>,
164  class Value = rxu::value_type_t<Window>>
165  static auto member(Observable&& o, int count)
166  -> decltype(o.template lift<Value>(Window(count, count))) {
167  return o.template lift<Value>(Window(count, count));
168  }
169 
170  template<class... AN>
171  static operators::detail::window_invalid_t<AN...> member(AN...) {
172  std::terminate();
173  return {};
174  static_assert(sizeof...(AN) == 10000, "window takes (Count, optional Skip)");
175  }
176 };
177 
178 }
179 
180 #endif
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
static auto member(Observable &&o, int count, int skip) -> decltype(o.template lift< Value >(Window(count, skip)))
Definition: rx-window.hpp:154
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
static auto member(Observable &&o, int count) -> decltype(o.template lift< Value >(Window(count, count)))
Definition: rx-window.hpp:165
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto window(AN &&...an) -> operator_factory< window_tag, AN... >
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-window.hpp:138
Definition: rx-operators.hpp:473
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
static operators::detail::window_invalid_t< AN... > member(AN...)
Definition: rx-window.hpp:171
Definition: rx-predef.hpp:177