25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP) 26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP 28 #include "../rx-includes.hpp" 37 struct buffer_count_invalid_arguments {};
40 struct buffer_count_invalid :
public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
41 using type = observable<buffer_count_invalid_arguments<
AN...>, buffer_count_invalid<
AN...>>;
44 using buffer_count_invalid_t =
typename buffer_count_invalid<
AN...>::type;
49 typedef rxu::decay_t<T> source_value_type;
50 typedef std::vector<source_value_type> value_type;
52 struct buffer_count_values
54 buffer_count_values(
int c,
int s)
63 buffer_count_values initial;
66 : initial(count, skip)
70 template<
class Subscriber>
71 struct buffer_count_observer :
public buffer_count_values
73 typedef buffer_count_observer<Subscriber> this_type;
74 typedef std::vector<T> value_type;
75 typedef rxu::decay_t<Subscriber> dest_type;
76 typedef observer<value_type, this_type> observer_type;
79 mutable std::deque<value_type> chunks;
81 buffer_count_observer(dest_type d, buffer_count_values v)
82 : buffer_count_values(v)
87 void on_next(T v)
const {
88 if (cursor++ % this->skip == 0) {
89 chunks.emplace_back();
91 for(
auto& chunk : chunks) {
94 while (!chunks.empty() && int(chunks.front().size()) == this->count) {
95 dest.on_next(std::move(chunks.front()));
99 void on_error(std::exception_ptr e)
const {
102 void on_completed()
const {
105 while (!chunks.empty()) {
106 dest.on_next(std::move(chunks.front()));
118 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
119 auto cs = d.get_subscription();
120 return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
124 template<
class Subscriber>
125 auto operator()(Subscriber dest)
const 126 -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
127 return buffer_count_observer<Subscriber>::make(std::move(dest), initial);
135 template<
class...
AN>
138 return operator_factory<buffer_count_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
146 template<
class Observable,
150 class BufferCount = rxo::detail::buffer_count<SourceValue>,
153 -> decltype(o.template lift<Value>(BufferCount(
count,
skip))) {
154 return o.template lift<Value>(BufferCount(
count,
skip));
157 template<
class Observable,
159 is_observable<Observable>>,
161 class BufferCount = rxo::detail::buffer_count<SourceValue>,
164 -> decltype(o.template lift<Value>(BufferCount(
count,
count))) {
165 return o.template lift<Value>(BufferCount(
count,
count));
168 template<
class...
AN>
169 static operators::detail::buffer_count_invalid_t<
AN...>
member(
AN...) {
172 static_assert(
sizeof...(
AN) == 10000,
"buffer takes (Count, optional Skip)");
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static operators::detail::buffer_count_invalid_t< AN... > member(AN...)
Definition: rx-buffer_count.hpp:169
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, int count, int skip) -> decltype(o.template lift< Value >(BufferCount(count, skip)))
Definition: rx-buffer_count.hpp:152
Definition: rx-operators.hpp:47
static auto member(Observable &&o, int count) -> decltype(o.template lift< Value >(BufferCount(count, count)))
Definition: rx-buffer_count.hpp:163
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
Definition: rx-operators.hpp:129
auto buffer(AN &&...an) -> operator_factory< buffer_count_tag, AN... >
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-buffer_count.hpp:136
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-predef.hpp:177