25 #if !defined(RXCPP_OPERATORS_RX_WINDOW_HPP) 26 #define RXCPP_OPERATORS_RX_WINDOW_HPP 28 #include "../rx-includes.hpp" 37 struct window_invalid_arguments {};
40 struct window_invalid :
public rxo::operator_base<window_invalid_arguments<AN...>> {
41 using type = observable<window_invalid_arguments<
AN...>, window_invalid<
AN...>>;
44 using window_invalid_t =
typename window_invalid<
AN...>::type;
49 typedef rxu::decay_t<T> source_value_type;
50 typedef observable<source_value_type> value_type;
54 window_values(
int c,
int s)
63 window_values initial;
66 : initial(count, skip)
70 template<
class Subscriber>
71 struct window_observer :
public window_values
73 typedef window_observer<Subscriber> this_type;
74 typedef rxu::decay_t<T> value_type;
75 typedef rxu::decay_t<Subscriber> dest_type;
76 typedef observer<T, this_type> observer_type;
79 mutable std::deque<rxcpp::subjects::subject<T>> subj;
81 window_observer(dest_type d, window_values v)
87 dest.on_next(subj[0].get_observable().
as_dynamic());
89 void on_next(T v)
const {
91 s.get_subscriber().on_next(v);
94 int c = cursor - this->count + 1;
95 if (c >= 0 && c % this->skip == 0) {
96 subj[0].get_subscriber().on_completed();
100 if (++cursor % this->skip == 0) {
102 dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic());
106 void on_error(std::exception_ptr e)
const {
107 for (
auto s : subj) {
108 s.get_subscriber().on_error(e);
113 void on_completed()
const {
114 for (
auto s : subj) {
115 s.get_subscriber().on_completed();
120 static subscriber<T, observer_type> make(dest_type d, window_values v) {
121 auto cs = d.get_subscription();
122 return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v))));
126 template<
class Subscriber>
127 auto operator()(Subscriber dest)
const 128 -> decltype(window_observer<Subscriber>::make(std::move(dest), initial)) {
129 return window_observer<Subscriber>::make(std::move(dest), initial);
137 template<
class...
AN>
148 template<
class Observable,
152 class Window = rxo::detail::window<SourceValue>,
155 -> decltype(o.template lift<Value>(Window(
count,
skip))) {
156 return o.template lift<Value>(Window(
count,
skip));
159 template<
class Observable,
161 is_observable<Observable>>,
163 class Window = rxo::detail::window<SourceValue>,
166 -> decltype(o.template lift<Value>(Window(
count,
count))) {
167 return o.template lift<Value>(Window(
count,
count));
170 template<
class...
AN>
171 static operators::detail::window_invalid_t<
AN...>
member(
AN...) {
174 static_assert(
sizeof...(
AN) == 10000,
"window takes (Count, optional Skip)");
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
static auto member(Observable &&o, int count, int skip) -> decltype(o.template lift< Value >(Window(count, skip)))
Definition: rx-window.hpp:154
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
static auto member(Observable &&o, int count) -> decltype(o.template lift< Value >(Window(count, count)))
Definition: rx-window.hpp:165
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto window(AN &&...an) -> operator_factory< window_tag, AN... >
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-window.hpp:138
Definition: rx-operators.hpp:473
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
static operators::detail::window_invalid_t< AN... > member(AN...)
Definition: rx-window.hpp:171
Definition: rx-predef.hpp:177