22 #if !defined(RXCPP_OPERATORS_RX_DEBOUNCE_HPP) 23 #define RXCPP_OPERATORS_RX_DEBOUNCE_HPP 25 #include "../rx-includes.hpp" 34 struct debounce_invalid_arguments {};
37 struct debounce_invalid :
public rxo::operator_base<debounce_invalid_arguments<AN...>> {
38 using type = observable<debounce_invalid_arguments<
AN...>, debounce_invalid<
AN...>>;
41 using debounce_invalid_t =
typename debounce_invalid<
AN...>::type;
43 template<
class T,
class Duration,
class Coordination>
46 typedef rxu::decay_t<T> source_value_type;
47 typedef rxu::decay_t<Coordination> coordination_type;
48 typedef typename coordination_type::coordinator_type coordinator_type;
49 typedef rxu::decay_t<Duration> duration_type;
51 struct debounce_values
53 debounce_values(duration_type p, coordination_type c)
60 coordination_type coordination;
62 debounce_values initial;
64 debounce(duration_type period, coordination_type coordination)
65 : initial(period, coordination)
69 template<
class Subscriber>
70 struct debounce_observer
72 typedef debounce_observer<Subscriber> this_type;
73 typedef rxu::decay_t<T> value_type;
74 typedef rxu::decay_t<Subscriber> dest_type;
75 typedef observer<T, this_type> observer_type;
77 struct debounce_subscriber_values :
public debounce_values
79 debounce_subscriber_values(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
83 , coordinator(std::move(c))
84 , worker(coordinator.get_worker())
89 composite_subscription cs;
91 coordinator_type coordinator;
93 mutable std::size_t index;
94 mutable rxu::maybe<value_type> value;
96 typedef std::shared_ptr<debounce_subscriber_values> state_type;
99 debounce_observer(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
100 : state(std::make_shared<debounce_subscriber_values>(debounce_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
102 auto localState = state;
104 auto disposer = [=](
const rxsc::schedulable&){
105 localState->cs.unsubscribe();
106 localState->dest.unsubscribe();
107 localState->worker.unsubscribe();
110 [&](){
return localState->coordinator.act(disposer); },
112 if (selectedDisposer.empty()) {
116 localState->dest.add([=](){
117 localState->worker.schedule(selectedDisposer.get());
119 localState->cs.add([=](){
120 localState->worker.schedule(selectedDisposer.get());
124 static std::function<void(const rxsc::schedulable&)> produce_item(std::size_t
id, state_type state) {
125 auto produce = [id, state](
const rxsc::schedulable&) {
126 if(
id != state->index)
129 state->dest.on_next(*state->value);
130 state->value.reset();
134 [&](){
return state->coordinator.act(produce); },
136 if (selectedProduce.empty()) {
137 return std::function<void(const rxsc::schedulable&)>();
140 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
143 void on_next(T v)
const {
144 auto localState = state;
145 auto work = [v, localState](
const rxsc::schedulable&) {
146 auto new_id = ++localState->index;
147 auto produce_time = localState->worker.now() + localState->period;
149 localState->value.reset(v);
150 localState->worker.schedule(produce_time, produce_item(new_id, localState));
153 [&](){
return localState->coordinator.act(work);},
155 if (selectedWork.empty()) {
158 localState->worker.schedule(selectedWork.get());
161 void on_error(std::exception_ptr e)
const {
162 auto localState = state;
163 auto work = [e, localState](
const rxsc::schedulable&) {
164 localState->dest.on_error(e);
165 localState->value.reset();
168 [&](){
return localState->coordinator.act(work); },
170 if (selectedWork.empty()) {
173 localState->worker.schedule(selectedWork.get());
176 void on_completed()
const {
177 auto localState = state;
178 auto work = [localState](
const rxsc::schedulable&) {
179 if(!localState->value.empty()) {
180 localState->dest.on_next(*localState->value);
182 localState->dest.on_completed();
185 [&](){
return localState->coordinator.act(work); },
187 if (selectedWork.empty()) {
190 localState->worker.schedule(selectedWork.get());
193 static subscriber<T, observer_type> make(dest_type d, debounce_values v) {
194 auto cs = composite_subscription();
195 auto coordinator = v.coordination.create_coordinator();
197 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
201 template<
class Subscriber>
202 auto operator()(Subscriber dest)
const 203 -> decltype(debounce_observer<Subscriber>::make(std::move(dest), initial)) {
204 return debounce_observer<Subscriber>::make(std::move(dest), initial);
212 template<
class...
AN>
215 return operator_factory<debounce_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
223 template<
class Observable,
class Duration,
228 class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>>
229 static auto member(Observable&& o, Duration&& d)
234 template<
class Observable,
class Coordination,
class Duration,
236 is_observable<Observable>,
238 rxu::is_duration<Duration>>,
241 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
242 -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243 return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
246 template<
class Observable,
class Coordination,
class Duration,
248 is_observable<Observable>,
249 is_coordination<Coordination>,
250 rxu::is_duration<Duration>>,
253 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
254 -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
255 return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
258 template<
class...
AN>
259 static operators::detail::debounce_invalid_t<
AN...>
member(
const AN&...) {
262 static_assert(
sizeof...(
AN) == 10000,
"debounce takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
static operators::detail::debounce_invalid_t< AN... > member(const AN &...)
Definition: rx-debounce.hpp:259
Definition: rx-util.hpp:791
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(Debounce(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-debounce.hpp:253
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto debounce(AN &&...an) -> operator_factory< debounce_tag, AN... >
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-debounce.hpp:213
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(Debounce(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-debounce.hpp:229
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
Definition: rx-operators.hpp:178
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(Debounce(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-debounce.hpp:241
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37