37 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP) 38 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP 40 #include "../rx-includes.hpp" 49 struct window_with_time_invalid_arguments {};
52 struct window_with_time_invalid :
public rxo::operator_base<window_with_time_invalid_arguments<AN...>> {
53 using type = observable<window_with_time_invalid_arguments<
AN...>, window_with_time_invalid<
AN...>>;
56 using window_with_time_invalid_t =
typename window_with_time_invalid<
AN...>::type;
58 template<
class T,
class Duration,
class Coordination>
61 typedef rxu::decay_t<T> source_value_type;
62 typedef observable<source_value_type> value_type;
63 typedef rxu::decay_t<Coordination> coordination_type;
64 typedef typename coordination_type::coordinator_type coordinator_type;
65 typedef rxu::decay_t<Duration> duration_type;
67 struct window_with_time_values
69 window_with_time_values(duration_type p, duration_type s, coordination_type c)
77 coordination_type coordination;
79 window_with_time_values initial;
82 : initial(period, skip, coordination)
86 template<
class Subscriber>
87 struct window_with_time_observer
89 typedef window_with_time_observer<Subscriber> this_type;
90 typedef rxu::decay_t<T> value_type;
91 typedef rxu::decay_t<Subscriber> dest_type;
92 typedef observer<T, this_type> observer_type;
94 struct window_with_time_subscriber_values :
public window_with_time_values
96 window_with_time_subscriber_values(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
97 : window_with_time_values(v)
100 , coordinator(std::move(c))
101 , worker(coordinator.get_worker())
102 , expected(worker.now())
105 composite_subscription cs;
107 coordinator_type coordinator;
109 mutable std::deque<rxcpp::subjects::subject<T>> subj;
110 rxsc::scheduler::clock_type::time_point expected;
112 std::shared_ptr<window_with_time_subscriber_values> state;
114 window_with_time_observer(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
115 : state(std::make_shared<window_with_time_subscriber_values>(window_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
117 auto localState = state;
119 auto disposer = [=](
const rxsc::schedulable&){
120 localState->cs.unsubscribe();
121 localState->dest.unsubscribe();
122 localState->worker.unsubscribe();
125 [&](){
return localState->coordinator.act(disposer);},
127 if (selectedDisposer.empty()) {
131 localState->dest.add([=](){
132 localState->worker.schedule(selectedDisposer.get());
134 localState->cs.add([=](){
135 localState->worker.schedule(selectedDisposer.get());
144 auto release_window = [localState](
const rxsc::schedulable&) {
145 localState->worker.schedule([localState](
const rxsc::schedulable&) {
146 localState->subj[0].get_subscriber().on_completed();
147 localState->subj.pop_front();
151 [&](){
return localState->coordinator.act(release_window);},
153 if (selectedRelease.empty()) {
157 auto create_window = [localState, selectedRelease](
const rxsc::schedulable&) {
159 localState->dest.on_next(localState->subj[localState->subj.size() - 1].get_observable().as_dynamic());
161 auto produce_at = localState->expected + localState->period;
162 localState->expected += localState->skip;
163 localState->worker.schedule(produce_at, [localState, selectedRelease](
const rxsc::schedulable&) {
164 localState->worker.schedule(selectedRelease.get());
168 [&](){
return localState->coordinator.act(create_window);},
170 if (selectedCreate.empty()) {
174 state->worker.schedule_periodically(
177 [localState, selectedCreate](
const rxsc::schedulable&) {
178 localState->worker.schedule(selectedCreate.get());
182 void on_next(T v)
const {
183 auto localState = state;
184 auto work = [v, localState](
const rxsc::schedulable&){
185 for (
auto s : localState->subj) {
186 s.get_subscriber().on_next(v);
190 [&](){
return localState->coordinator.act(work);},
192 if (selectedWork.empty()) {
195 localState->worker.schedule(selectedWork.get());
198 void on_error(std::exception_ptr e)
const {
199 auto localState = state;
200 auto work = [e, localState](
const rxsc::schedulable&){
201 for (
auto s : localState->subj) {
202 s.get_subscriber().on_error(e);
204 localState->dest.on_error(e);
207 [&](){
return localState->coordinator.act(work);},
209 if (selectedWork.empty()) {
212 localState->worker.schedule(selectedWork.get());
215 void on_completed()
const {
216 auto localState = state;
217 auto work = [localState](
const rxsc::schedulable&){
218 for (
auto s : localState->subj) {
219 s.get_subscriber().on_completed();
221 localState->dest.on_completed();
224 [&](){
return localState->coordinator.act(work);},
226 if (selectedWork.empty()) {
229 localState->worker.schedule(selectedWork.get());
232 static subscriber<T, observer_type> make(dest_type d, window_with_time_values v) {
233 auto cs = composite_subscription();
234 auto coordinator = v.coordination.create_coordinator();
236 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
240 template<
class Subscriber>
241 auto operator()(Subscriber dest)
const 242 -> decltype(window_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
243 return window_with_time_observer<Subscriber>::make(std::move(dest), initial);
251 template<
class...
AN>
254 return operator_factory<window_with_time_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
262 template<
class Observable,
class Duration,
265 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
267 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>,
269 static auto member(Observable&& o, Duration period)
274 template<
class Observable,
class Duration,
class Coordination,
276 is_observable<Observable>,
277 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
282 static auto member(Observable&& o, Duration period, Coordination&& cn)
283 -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) {
284 return o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)));
287 template<
class Observable,
class Duration,
289 is_observable<Observable>,
290 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
292 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
294 static auto member(Observable&& o, Duration&& period, Duration&&
skip)
295 -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip),
identity_current_thread()))) {
296 return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip),
identity_current_thread()));
299 template<
class Observable,
class Duration,
class Coordination,
301 is_observable<Observable>,
302 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
303 is_coordination<Coordination>>,
305 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
307 static auto member(Observable&& o, Duration&& period, Duration&&
skip, Coordination&& cn)
308 -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip), std::forward<Coordination>(cn)))) {
309 return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(
skip), std::forward<Coordination>(cn)));
312 template<
class...
AN>
313 static operators::detail::window_with_time_invalid_t<
AN...>
member(
AN...) {
316 static_assert(
sizeof...(
AN) == 10000,
"window_with_time takes (Duration, optional Duration, optional Coordination)");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, Duration &&period, Duration &&skip, Coordination &&cn) -> decltype(o.template lift< Value >(WindowWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), std::forward< Coordination >(cn))))
Definition: rx-window_time.hpp:307
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration period, Coordination &&cn) -> decltype(o.template lift< Value >(WindowWithTime(period, period, std::forward< Coordination >(cn))))
Definition: rx-window_time.hpp:282
static operators::detail::window_with_time_invalid_t< AN... > member(AN...)
Definition: rx-window_time.hpp:313
Definition: rx-operators.hpp:480
auto skip(AN &&...an) -> operator_factory< skip_tag, AN... >
Make new observable with skipped first count items from this observable.
Definition: rx-skip.hpp:130
static auto member(Observable &&o, Duration period) -> decltype(o.template lift< Value >(WindowWithTime(period, period, identity_current_thread())))
Definition: rx-window_time.hpp:269
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto window_with_time(AN &&...an) -> operator_factory< window_with_time_tag, AN... >
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-window_time.hpp:252
Definition: rx-coordination.hpp:114
static auto member(Observable &&o, Duration &&period, Duration &&skip) -> decltype(o.template lift< Value >(WindowWithTime(std::forward< Duration >(period), std::forward< Duration >(skip), identity_current_thread())))
Definition: rx-window_time.hpp:294
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37