27 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP)    28 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP    30 #include "../rx-includes.hpp"    39 struct buffer_with_time_or_count_invalid_arguments {};
    42 struct buffer_with_time_or_count_invalid : 
public rxo::operator_base<buffer_with_time_or_count_invalid_arguments<AN...>> {
    43     using type = observable<buffer_with_time_or_count_invalid_arguments<
AN...>, buffer_with_time_or_count_invalid<
AN...>>;
    46 using buffer_with_time_or_count_invalid_t = 
typename buffer_with_time_or_count_invalid<
AN...>::type;
    48 template<
class T, 
class Duration, 
class Coordination>
    51     typedef rxu::decay_t<T> source_value_type;
    52     typedef std::vector<source_value_type> value_type;
    53     typedef rxu::decay_t<Coordination> coordination_type;
    54     typedef typename coordination_type::coordinator_type coordinator_type;
    55     typedef rxu::decay_t<Duration> duration_type;
    57     struct buffer_with_time_or_count_values
    59         buffer_with_time_or_count_values(duration_type p, 
int n, coordination_type c)
    67         coordination_type coordination;
    69     buffer_with_time_or_count_values initial;
    72         : initial(period, count, coordination)
    76     template<
class Subscriber>
    77     struct buffer_with_time_or_count_observer
    79         typedef buffer_with_time_or_count_observer<Subscriber> this_type;
    80         typedef std::vector<T> value_type;
    81         typedef rxu::decay_t<Subscriber> dest_type;
    82         typedef observer<value_type, this_type> observer_type;
    84         struct buffer_with_time_or_count_subscriber_values : 
public buffer_with_time_or_count_values
    86             buffer_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
    87                 : buffer_with_time_or_count_values(std::move(v))
    90                 , coordinator(std::move(c))
    91                 , worker(coordinator.get_worker())
    95             composite_subscription cs;
    97             coordinator_type coordinator;
   100             mutable value_type chunk;
   102         typedef std::shared_ptr<buffer_with_time_or_count_subscriber_values> state_type;
   105         buffer_with_time_or_count_observer(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
   106             : state(std::make_shared<buffer_with_time_or_count_subscriber_values>(buffer_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
   108             auto new_id = state->chunk_id;
   109             auto produce_time = state->worker.now() + state->period;
   110             auto localState = state;
   112             auto disposer = [=](
const rxsc::schedulable&){
   113                 localState->cs.unsubscribe();
   114                 localState->dest.unsubscribe();
   115                 localState->worker.unsubscribe();
   118                 [&](){
return localState->coordinator.act(disposer);},
   120             if (selectedDisposer.empty()) {
   124             localState->dest.add([=](){
   125                 localState->worker.schedule(selectedDisposer.get());
   127             localState->cs.add([=](){
   128                 localState->worker.schedule(selectedDisposer.get());
   137             localState->worker.schedule(produce_time, [new_id, produce_time, localState](
const rxsc::schedulable&){
   138                 localState->worker.schedule(produce_buffer(new_id, produce_time, localState));
   142         static std::function<void(const rxsc::schedulable&)> produce_buffer(
int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
   143             auto produce = [id, expected, state](
const rxsc::schedulable&) {
   144                 if (
id != state->chunk_id)
   147                 state->dest.on_next(state->chunk);
   148                 state->chunk.resize(0);
   149                 auto new_id = ++state->chunk_id;
   150                 auto produce_time = expected + state->period;
   151                 state->worker.schedule(produce_time, [new_id, produce_time, state](
const rxsc::schedulable&){
   152                     state->worker.schedule(produce_buffer(new_id, produce_time, state));
   157                 [&](){
return state->coordinator.act(produce);},
   159             if (selectedProduce.empty()) {
   160                 return std::function<void(const rxsc::schedulable&)>();
   163             return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
   166         void on_next(T v)
 const {
   167             auto localState = state;
   168             auto work = [v, localState](
const rxsc::schedulable& 
self){
   169                 localState->chunk.push_back(v);
   170                 if (
int(localState->chunk.size()) == localState->count) {
   171                     produce_buffer(localState->chunk_id, localState->worker.now(), localState)(
self);
   175                 [&](){
return localState->coordinator.act(work);},
   177             if (selectedWork.empty()) {
   180             localState->worker.schedule(selectedWork.get());
   182         void on_error(std::exception_ptr e)
 const {
   183             auto localState = state;
   184             auto work = [e, localState](
const rxsc::schedulable&){
   185                 localState->dest.on_error(e);
   188                 [&](){
return localState->coordinator.act(work);},
   190             if (selectedWork.empty()) {
   193             localState->worker.schedule(selectedWork.get());
   195         void on_completed()
 const {
   196             auto localState = state;
   197             auto work = [localState](
const rxsc::schedulable&){
   198                 localState->dest.on_next(localState->chunk);
   199                 localState->dest.on_completed();
   202                 [&](){
return localState->coordinator.act(work);},
   204             if (selectedWork.empty()) {
   207             localState->worker.schedule(selectedWork.get());
   210         static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_or_count_values v) {
   211             auto cs = composite_subscription();
   212             auto coordinator = v.coordination.create_coordinator();
   214             return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
   218     template<
class Subscriber>
   219     auto operator()(Subscriber dest) 
const   220         -> decltype(buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
   221         return      buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
   229 template<
class... 
AN>
   232      return operator_factory<buffer_with_time_or_count_tag, 
AN...>(std::make_tuple(std::forward<AN>(an)...));
   240     template<
class Observable, 
class Duration,
   243             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
   245         class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, 
identity_one_worker>,
   252     template<
class Observable, 
class Duration, 
class Coordination,
   254             is_observable<Observable>,
   255             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
   258         class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, 
rxu::decay_t<Coordination>>,
   260     static auto member(Observable&& o, Duration&& period, 
int count, Coordination&& cn)
   261         -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), 
count, std::forward<Coordination>(cn)))) {
   262         return      o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), 
count, std::forward<Coordination>(cn)));
   265     template<
class... 
AN>
   266     static operators::detail::buffer_with_time_or_count_invalid_t<
AN...> 
member(
AN...) {
   269         static_assert(
sizeof...(
AN) == 10000, 
"buffer_with_time_or_count takes (Duration, Count, optional Coordination)");
 Definition: rx-operators.hpp:143
static operators::detail::buffer_with_time_or_count_invalid_t< AN... > member(AN...)
Definition: rx-buffer_time_count.hpp:266
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count. 
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static auto member(Observable &&o, Duration &&period, int count) -> decltype(o.template lift< Value >(BufferTimeCount(std::forward< Duration >(period), count, identity_current_thread())))
Definition: rx-buffer_time_count.hpp:247
Definition: rx-operators.hpp:69
static auto member(Observable &&o, Duration &&period, int count, Coordination &&cn) -> decltype(o.template lift< Value >(BufferTimeCount(std::forward< Duration >(period), count, std::forward< Coordination >(cn))))
Definition: rx-buffer_time_count.hpp:260
auto AN
Definition: rx-finally.hpp:105
auto buffer_with_time_or_count(AN &&...an) -> operator_factory< buffer_with_time_or_count_tag, AN... >
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-buffer_time_count.hpp:230
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37