21 #if !defined(RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP) 22 #define RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP 24 #include "../rx-includes.hpp" 33 struct time_interval_invalid_arguments {};
36 struct time_interval_invalid :
public rxo::operator_base<time_interval_invalid_arguments<AN...>> {
37 using type = observable<time_interval_invalid_arguments<
AN...>, time_interval_invalid<
AN...>>;
40 using time_interval_invalid_t =
typename time_interval_invalid<
AN...>::type;
42 template<
class T,
class Coordination>
45 typedef rxu::decay_t<T> source_value_type;
46 typedef rxu::decay_t<Coordination> coordination_type;
48 struct time_interval_values {
49 time_interval_values(coordination_type c)
54 coordination_type coordination;
56 time_interval_values initial;
59 : initial(coordination)
63 template<
class Subscriber>
64 struct time_interval_observer
66 typedef time_interval_observer<Subscriber> this_type;
67 typedef source_value_type value_type;
68 typedef rxu::decay_t<Subscriber> dest_type;
69 typedef observer<value_type, this_type> observer_type;
70 typedef rxsc::scheduler::clock_type::time_point time_point;
72 coordination_type coord;
73 mutable time_point
last;
75 time_interval_observer(dest_type d, coordination_type coordination)
77 coord(std::move(coordination)),
82 void on_next(source_value_type)
const {
83 time_point now = coord.now();
84 dest.on_next(now - last);
87 void on_error(std::exception_ptr e)
const {
90 void on_completed()
const {
94 static subscriber<value_type, observer_type> make(dest_type d, time_interval_values v) {
95 return make_subscriber<value_type>(d, this_type(d, v.coordination));
99 template<
class Subscriber>
100 auto operator()(Subscriber dest)
const 101 -> decltype(time_interval_observer<Subscriber>::make(std::move(dest), initial)) {
102 return time_interval_observer<Subscriber>::make(std::move(dest), initial);
110 template<
class...
AN>
113 return operator_factory<time_interval_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
121 template<
class Observable,
125 class TimeInterval = rxo::detail::time_interval<SourceValue, identity_one_worker>,
126 class Value =
typename rxsc::scheduler::clock_type::time_point::duration>
132 template<
class Observable,
class Coordination,
134 is_observable<Observable>,
137 class TimeInterval = rxo::detail::time_interval<SourceValue, rxu::decay_t<Coordination>>,
138 class Value =
typename rxsc::scheduler::clock_type::time_point::duration>
139 static auto member(Observable&& o, Coordination&& cn)
140 -> decltype(o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)))) {
141 return o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)));
144 template<
class...
AN>
145 static operators::detail::time_interval_invalid_t<
AN...>
member(
AN...) {
148 static_assert(
sizeof...(
AN) == 10000,
"time_interval takes (optional Coordination)");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
static auto member(Observable &&o) -> decltype(o.template lift< Value >(TimeInterval(identity_current_thread())))
Definition: rx-time_interval.hpp:127
Definition: rx-operators.hpp:459
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto time_interval(AN &&...an) -> operator_factory< time_interval_tag, AN... >
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-time_interval.hpp:111
static auto member(Observable &&o, Coordination &&cn) -> decltype(o.template lift< Value >(TimeInterval(std::forward< Coordination >(cn))))
Definition: rx-time_interval.hpp:139
static operators::detail::time_interval_invalid_t< AN... > member(AN...)
Definition: rx-time_interval.hpp:145
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37