RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-debounce.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
22 #if !defined(RXCPP_OPERATORS_RX_DEBOUNCE_HPP)
23 #define RXCPP_OPERATORS_RX_DEBOUNCE_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct debounce_invalid_arguments {};
35 
36 template<class... AN>
37 struct debounce_invalid : public rxo::operator_base<debounce_invalid_arguments<AN...>> {
38  using type = observable<debounce_invalid_arguments<AN...>, debounce_invalid<AN...>>;
39 };
40 template<class... AN>
41 using debounce_invalid_t = typename debounce_invalid<AN...>::type;
42 
43 template<class T, class Duration, class Coordination>
44 struct debounce
45 {
46  typedef rxu::decay_t<T> source_value_type;
47  typedef rxu::decay_t<Coordination> coordination_type;
48  typedef typename coordination_type::coordinator_type coordinator_type;
49  typedef rxu::decay_t<Duration> duration_type;
50 
51  struct debounce_values
52  {
53  debounce_values(duration_type p, coordination_type c)
54  : period(p)
55  , coordination(c)
56  {
57  }
58 
59  duration_type period;
60  coordination_type coordination;
61  };
62  debounce_values initial;
63 
64  debounce(duration_type period, coordination_type coordination)
65  : initial(period, coordination)
66  {
67  }
68 
69  template<class Subscriber>
70  struct debounce_observer
71  {
72  typedef debounce_observer<Subscriber> this_type;
73  typedef rxu::decay_t<T> value_type;
74  typedef rxu::decay_t<Subscriber> dest_type;
75  typedef observer<T, this_type> observer_type;
76 
77  struct debounce_subscriber_values : public debounce_values
78  {
79  debounce_subscriber_values(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
80  : debounce_values(v)
81  , cs(std::move(cs))
82  , dest(std::move(d))
83  , coordinator(std::move(c))
84  , worker(coordinator.get_worker())
85  , index(0)
86  {
87  }
88 
89  composite_subscription cs;
90  dest_type dest;
91  coordinator_type coordinator;
92  rxsc::worker worker;
93  mutable std::size_t index;
94  mutable rxu::maybe<value_type> value;
95  };
96  typedef std::shared_ptr<debounce_subscriber_values> state_type;
97  state_type state;
98 
99  debounce_observer(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
100  : state(std::make_shared<debounce_subscriber_values>(debounce_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
101  {
102  auto localState = state;
103 
104  auto disposer = [=](const rxsc::schedulable&){
105  localState->cs.unsubscribe();
106  localState->dest.unsubscribe();
107  localState->worker.unsubscribe();
108  };
109  auto selectedDisposer = on_exception(
110  [&](){ return localState->coordinator.act(disposer); },
111  localState->dest);
112  if (selectedDisposer.empty()) {
113  return;
114  }
115 
116  localState->dest.add([=](){
117  localState->worker.schedule(selectedDisposer.get());
118  });
119  localState->cs.add([=](){
120  localState->worker.schedule(selectedDisposer.get());
121  });
122  }
123 
124  static std::function<void(const rxsc::schedulable&)> produce_item(std::size_t id, state_type state) {
125  auto produce = [id, state](const rxsc::schedulable&) {
126  if(id != state->index)
127  return;
128 
129  state->dest.on_next(*state->value);
130  state->value.reset();
131  };
132 
133  auto selectedProduce = on_exception(
134  [&](){ return state->coordinator.act(produce); },
135  state->dest);
136  if (selectedProduce.empty()) {
137  return std::function<void(const rxsc::schedulable&)>();
138  }
139 
140  return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
141  }
142 
143  void on_next(T v) const {
144  auto localState = state;
145  auto work = [v, localState](const rxsc::schedulable&) {
146  auto new_id = ++localState->index;
147  auto produce_time = localState->worker.now() + localState->period;
148 
149  localState->value.reset(v);
150  localState->worker.schedule(produce_time, produce_item(new_id, localState));
151  };
152  auto selectedWork = on_exception(
153  [&](){return localState->coordinator.act(work);},
154  localState->dest);
155  if (selectedWork.empty()) {
156  return;
157  }
158  localState->worker.schedule(selectedWork.get());
159  }
160 
161  void on_error(std::exception_ptr e) const {
162  auto localState = state;
163  auto work = [e, localState](const rxsc::schedulable&) {
164  localState->dest.on_error(e);
165  localState->value.reset();
166  };
167  auto selectedWork = on_exception(
168  [&](){ return localState->coordinator.act(work); },
169  localState->dest);
170  if (selectedWork.empty()) {
171  return;
172  }
173  localState->worker.schedule(selectedWork.get());
174  }
175 
176  void on_completed() const {
177  auto localState = state;
178  auto work = [localState](const rxsc::schedulable&) {
179  if(!localState->value.empty()) {
180  localState->dest.on_next(*localState->value);
181  }
182  localState->dest.on_completed();
183  };
184  auto selectedWork = on_exception(
185  [&](){ return localState->coordinator.act(work); },
186  localState->dest);
187  if (selectedWork.empty()) {
188  return;
189  }
190  localState->worker.schedule(selectedWork.get());
191  }
192 
193  static subscriber<T, observer_type> make(dest_type d, debounce_values v) {
194  auto cs = composite_subscription();
195  auto coordinator = v.coordination.create_coordinator();
196 
197  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
198  }
199  };
200 
201  template<class Subscriber>
202  auto operator()(Subscriber dest) const
203  -> decltype(debounce_observer<Subscriber>::make(std::move(dest), initial)) {
204  return debounce_observer<Subscriber>::make(std::move(dest), initial);
205  }
206 };
207 
208 }
209 
212 template<class... AN>
213 auto debounce(AN&&... an)
215  return operator_factory<debounce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
216 }
217 
218 }
219 
220 template<>
222 {
223  template<class Observable, class Duration,
224  class Enabled = rxu::enable_if_all_true_type_t<
227  class SourceValue = rxu::value_type_t<Observable>,
228  class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
229  static auto member(Observable&& o, Duration&& d)
230  -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), identity_current_thread()))) {
231  return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), identity_current_thread()));
232  }
233 
234  template<class Observable, class Coordination, class Duration,
235  class Enabled = rxu::enable_if_all_true_type_t<
236  is_observable<Observable>,
238  rxu::is_duration<Duration>>,
239  class SourceValue = rxu::value_type_t<Observable>,
240  class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
241  static auto member(Observable&& o, Coordination&& cn, Duration&& d)
242  -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243  return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
244  }
245 
246  template<class Observable, class Coordination, class Duration,
247  class Enabled = rxu::enable_if_all_true_type_t<
248  is_observable<Observable>,
249  is_coordination<Coordination>,
250  rxu::is_duration<Duration>>,
251  class SourceValue = rxu::value_type_t<Observable>,
252  class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
253  static auto member(Observable&& o, Duration&& d, Coordination&& cn)
254  -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
255  return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
256  }
257 
258  template<class... AN>
259  static operators::detail::debounce_invalid_t<AN...> member(const AN&...) {
260  std::terminate();
261  return {};
262  static_assert(sizeof...(AN) == 10000, "debounce takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
263  }
264 };
265 
266 }
267 
268 #endif
static operators::detail::debounce_invalid_t< AN... > member(const AN &...)
Definition: rx-debounce.hpp:259
Definition: rx-util.hpp:791
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(Debounce(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-debounce.hpp:253
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto debounce(AN &&...an) -> operator_factory< debounce_tag, AN... >
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-debounce.hpp:213
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(Debounce(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-debounce.hpp:229
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
Definition: rx-operators.hpp:178
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(Debounce(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-debounce.hpp:241
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37