27 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP) 28 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP 30 #include "../rx-includes.hpp" 39 struct window_with_time_or_count_invalid_arguments {};
42 struct window_with_time_or_count_invalid :
public rxo::operator_base<window_with_time_or_count_invalid_arguments<AN...>> {
43 using type = observable<window_with_time_or_count_invalid_arguments<
AN...>, window_with_time_or_count_invalid<
AN...>>;
46 using window_with_time_or_count_invalid_t =
typename window_with_time_or_count_invalid<
AN...>::type;
48 template<
class T,
class Duration,
class Coordination>
51 typedef rxu::decay_t<T> source_value_type;
52 typedef observable<source_value_type> value_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
55 typedef rxu::decay_t<Duration> duration_type;
57 struct window_with_time_or_count_values
59 window_with_time_or_count_values(duration_type p,
int n, coordination_type c)
67 coordination_type coordination;
69 window_with_time_or_count_values initial;
72 : initial(period, count, coordination)
76 template<
class Subscriber>
77 struct window_with_time_or_count_observer
79 typedef window_with_time_or_count_observer<Subscriber> this_type;
80 typedef rxu::decay_t<T> value_type;
81 typedef rxu::decay_t<Subscriber> dest_type;
82 typedef observer<T, this_type> observer_type;
84 struct window_with_time_or_count_subscriber_values :
public window_with_time_or_count_values
86 window_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
87 : window_with_time_or_count_values(std::move(v))
90 , coordinator(std::move(c))
91 , worker(coordinator.get_worker())
96 composite_subscription cs;
98 coordinator_type coordinator;
104 typedef std::shared_ptr<window_with_time_or_count_subscriber_values> state_type;
107 window_with_time_or_count_observer(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
108 : state(std::make_shared<window_with_time_or_count_subscriber_values>(window_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
110 auto new_id = state->subj_id;
111 auto produce_time = state->worker.now();
112 auto localState = state;
114 auto disposer = [=](
const rxsc::schedulable&){
115 localState->cs.unsubscribe();
116 localState->dest.unsubscribe();
117 localState->worker.unsubscribe();
120 [&](){
return localState->coordinator.act(disposer);},
122 if (selectedDisposer.empty()) {
126 localState->dest.add([=](){
127 localState->worker.schedule(selectedDisposer.get());
129 localState->cs.add([=](){
130 localState->worker.schedule(selectedDisposer.get());
139 localState->worker.schedule(produce_time, [new_id, produce_time, localState](
const rxsc::schedulable&){
140 localState->worker.schedule(release_window(new_id, produce_time, localState));
144 static std::function<void(const rxsc::schedulable&)> release_window(
int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
145 auto release = [id, expected, state](
const rxsc::schedulable&) {
146 if (
id != state->subj_id)
149 state->subj.get_subscriber().on_completed();
151 state->dest.on_next(state->subj.get_observable().as_dynamic());
153 auto new_id = ++state->subj_id;
154 auto produce_time = expected + state->period;
155 state->worker.schedule(produce_time, [new_id, produce_time, state](
const rxsc::schedulable&){
156 state->worker.schedule(release_window(new_id, produce_time, state));
160 [&](){
return state->coordinator.act(release);},
162 if (selectedRelease.empty()) {
163 return std::function<void(const rxsc::schedulable&)>();
166 return std::function<void(const rxsc::schedulable&)>(selectedRelease.get());
169 void on_next(T v)
const {
170 auto localState = state;
171 auto work = [v, localState](
const rxsc::schedulable&
self){
172 localState->subj.get_subscriber().on_next(v);
173 if (++localState->cursor == localState->count) {
174 release_window(localState->subj_id, localState->worker.now(), localState)(
self);
178 [&](){
return localState->coordinator.act(work);},
180 if (selectedWork.empty()) {
183 localState->worker.schedule(selectedWork.get());
186 void on_error(std::exception_ptr e)
const {
187 auto localState = state;
188 auto work = [e, localState](
const rxsc::schedulable&){
189 localState->subj.get_subscriber().on_error(e);
190 localState->dest.on_error(e);
193 [&](){
return localState->coordinator.act(work);},
195 if (selectedWork.empty()) {
198 localState->worker.schedule(selectedWork.get());
201 void on_completed()
const {
202 auto localState = state;
203 auto work = [localState](
const rxsc::schedulable&){
204 localState->subj.get_subscriber().on_completed();
205 localState->dest.on_completed();
208 [&](){
return localState->coordinator.act(work);},
210 if (selectedWork.empty()) {
213 localState->worker.schedule(selectedWork.get());
216 static subscriber<T, observer_type> make(dest_type d, window_with_time_or_count_values v) {
217 auto cs = composite_subscription();
218 auto coordinator = v.coordination.create_coordinator();
220 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
224 template<
class Subscriber>
225 auto operator()(Subscriber dest)
const 226 -> decltype(window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
227 return window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
235 template<
class...
AN>
238 return operator_factory<window_with_time_or_count_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
246 template<
class Observable,
class Duration,
249 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
251 class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>,
258 template<
class Observable,
class Duration,
class Coordination,
260 is_observable<Observable>,
261 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
264 class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>,
rxu::decay_t<Coordination>>,
266 static auto member(Observable&& o, Duration&& period,
int count, Coordination&& cn)
267 -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period),
count, std::forward<Coordination>(cn)))) {
268 return o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period),
count, std::forward<Coordination>(cn)));
271 template<
class...
AN>
272 static operators::detail::window_with_time_or_count_invalid_t<
AN...>
member(
AN...) {
275 static_assert(
sizeof...(
AN) == 10000,
"window_with_time_or_count takes (Duration, Count, optional Coordination)");
static operators::detail::window_with_time_or_count_invalid_t< AN... > member(AN...)
Definition: rx-window_time_count.hpp:272
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto window_with_time_or_count(AN &&...an) -> operator_factory< window_with_time_or_count_tag, AN... >
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-window_time_count.hpp:236
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
Definition: rx-operators.hpp:487
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration &&period, int count, Coordination &&cn) -> decltype(o.template lift< Value >(WindowTimeCount(std::forward< Duration >(period), count, std::forward< Coordination >(cn))))
Definition: rx-window_time_count.hpp:266
static auto member(Observable &&o, Duration &&period, int count) -> decltype(o.template lift< Value >(WindowTimeCount(std::forward< Duration >(period), count, identity_current_thread())))
Definition: rx-window_time_count.hpp:253
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37