RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-buffer_count.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP)
26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP
27 
28 #include "../rx-includes.hpp"
29 
30 namespace rxcpp {
31 
32 namespace operators {
33 
34 namespace detail {
35 
36 template<class... AN>
37 struct buffer_count_invalid_arguments {};
38 
39 template<class... AN>
40 struct buffer_count_invalid : public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
41  using type = observable<buffer_count_invalid_arguments<AN...>, buffer_count_invalid<AN...>>;
42 };
43 template<class... AN>
44 using buffer_count_invalid_t = typename buffer_count_invalid<AN...>::type;
45 
46 template<class T>
47 struct buffer_count
48 {
49  typedef rxu::decay_t<T> source_value_type;
50  typedef std::vector<source_value_type> value_type;
51 
52  struct buffer_count_values
53  {
54  buffer_count_values(int c, int s)
55  : count(c)
56  , skip(s)
57  {
58  }
59  int count;
60  int skip;
61  };
62 
63  buffer_count_values initial;
64 
65  buffer_count(int count, int skip)
66  : initial(count, skip)
67  {
68  }
69 
70  template<class Subscriber>
71  struct buffer_count_observer : public buffer_count_values
72  {
73  typedef buffer_count_observer<Subscriber> this_type;
74  typedef std::vector<T> value_type;
75  typedef rxu::decay_t<Subscriber> dest_type;
76  typedef observer<value_type, this_type> observer_type;
77  dest_type dest;
78  mutable int cursor;
79  mutable std::deque<value_type> chunks;
80 
81  buffer_count_observer(dest_type d, buffer_count_values v)
82  : buffer_count_values(v)
83  , dest(std::move(d))
84  , cursor(0)
85  {
86  }
87  void on_next(T v) const {
88  if (cursor++ % this->skip == 0) {
89  chunks.emplace_back();
90  }
91  for(auto& chunk : chunks) {
92  chunk.push_back(v);
93  }
94  while (!chunks.empty() && int(chunks.front().size()) == this->count) {
95  dest.on_next(std::move(chunks.front()));
96  chunks.pop_front();
97  }
98  }
99  void on_error(std::exception_ptr e) const {
100  dest.on_error(e);
101  }
102  void on_completed() const {
103  auto done = on_exception(
104  [&](){
105  while (!chunks.empty()) {
106  dest.on_next(std::move(chunks.front()));
107  chunks.pop_front();
108  }
109  return true;
110  },
111  dest);
112  if (done.empty()) {
113  return;
114  }
115  dest.on_completed();
116  }
117 
118  static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
119  auto cs = d.get_subscription();
120  return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
121  }
122  };
123 
124  template<class Subscriber>
125  auto operator()(Subscriber dest) const
126  -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
127  return buffer_count_observer<Subscriber>::make(std::move(dest), initial);
128  }
129 };
130 
131 }
132 
135 template<class... AN>
136 auto buffer(AN&&... an)
138  return operator_factory<buffer_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
139 }
140 
141 }
142 
143 template<>
145 {
146  template<class Observable,
147  class Enabled = rxu::enable_if_all_true_type_t<
149  class SourceValue = rxu::value_type_t<Observable>,
150  class BufferCount = rxo::detail::buffer_count<SourceValue>,
151  class Value = rxu::value_type_t<BufferCount>>
152  static auto member(Observable&& o, int count, int skip)
153  -> decltype(o.template lift<Value>(BufferCount(count, skip))) {
154  return o.template lift<Value>(BufferCount(count, skip));
155  }
156 
157  template<class Observable,
158  class Enabled = rxu::enable_if_all_true_type_t<
159  is_observable<Observable>>,
160  class SourceValue = rxu::value_type_t<Observable>,
161  class BufferCount = rxo::detail::buffer_count<SourceValue>,
162  class Value = rxu::value_type_t<BufferCount>>
163  static auto member(Observable&& o, int count)
164  -> decltype(o.template lift<Value>(BufferCount(count, count))) {
165  return o.template lift<Value>(BufferCount(count, count));
166  }
167 
168  template<class... AN>
169  static operators::detail::buffer_count_invalid_t<AN...> member(AN...) {
170  std::terminate();
171  return {};
172  static_assert(sizeof...(AN) == 10000, "buffer takes (Count, optional Skip)");
173  }
174 };
175 
176 }
177 
178 #endif
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static operators::detail::buffer_count_invalid_t< AN... > member(AN...)
Definition: rx-buffer_count.hpp:169
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, int count, int skip) -> decltype(o.template lift< Value >(BufferCount(count, skip)))
Definition: rx-buffer_count.hpp:152
Definition: rx-operators.hpp:47
static auto member(Observable &&o, int count) -> decltype(o.template lift< Value >(BufferCount(count, count)))
Definition: rx-buffer_count.hpp:163
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
Definition: rx-operators.hpp:129
auto buffer(AN &&...an) -> operator_factory< buffer_count_tag, AN... >
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-buffer_count.hpp:136
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-predef.hpp:177