RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-time_interval.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
21 #if !defined(RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP)
22 #define RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP
23 
24 #include "../rx-includes.hpp"
25 
26 namespace rxcpp {
27 
28 namespace operators {
29 
30 namespace detail {
31 
32 template<class... AN>
33 struct time_interval_invalid_arguments {};
34 
35 template<class... AN>
36 struct time_interval_invalid : public rxo::operator_base<time_interval_invalid_arguments<AN...>> {
37  using type = observable<time_interval_invalid_arguments<AN...>, time_interval_invalid<AN...>>;
38 };
39 template<class... AN>
40 using time_interval_invalid_t = typename time_interval_invalid<AN...>::type;
41 
42 template<class T, class Coordination>
43 struct time_interval
44 {
45  typedef rxu::decay_t<T> source_value_type;
46  typedef rxu::decay_t<Coordination> coordination_type;
47 
48  struct time_interval_values {
49  time_interval_values(coordination_type c)
50  : coordination(c)
51  {
52  }
53 
54  coordination_type coordination;
55  };
56  time_interval_values initial;
57 
58  time_interval(coordination_type coordination)
59  : initial(coordination)
60  {
61  }
62 
63  template<class Subscriber>
64  struct time_interval_observer
65  {
66  typedef time_interval_observer<Subscriber> this_type;
67  typedef source_value_type value_type;
68  typedef rxu::decay_t<Subscriber> dest_type;
69  typedef observer<value_type, this_type> observer_type;
70  typedef rxsc::scheduler::clock_type::time_point time_point;
71  dest_type dest;
72  coordination_type coord;
73  mutable time_point last;
74 
75  time_interval_observer(dest_type d, coordination_type coordination)
76  : dest(std::move(d)),
77  coord(std::move(coordination)),
78  last(coord.now())
79  {
80  }
81 
82  void on_next(source_value_type) const {
83  time_point now = coord.now();
84  dest.on_next(now - last);
85  last = now;
86  }
87  void on_error(std::exception_ptr e) const {
88  dest.on_error(e);
89  }
90  void on_completed() const {
91  dest.on_completed();
92  }
93 
94  static subscriber<value_type, observer_type> make(dest_type d, time_interval_values v) {
95  return make_subscriber<value_type>(d, this_type(d, v.coordination));
96  }
97  };
98 
99  template<class Subscriber>
100  auto operator()(Subscriber dest) const
101  -> decltype(time_interval_observer<Subscriber>::make(std::move(dest), initial)) {
102  return time_interval_observer<Subscriber>::make(std::move(dest), initial);
103  }
104 };
105 
106 }
107 
110 template<class... AN>
111 auto time_interval(AN&&... an)
113  return operator_factory<time_interval_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
114 }
115 
116 }
117 
118 template<>
120 {
121  template<class Observable,
122  class Enabled = rxu::enable_if_all_true_type_t<
124  class SourceValue = rxu::value_type_t<Observable>,
125  class TimeInterval = rxo::detail::time_interval<SourceValue, identity_one_worker>,
126  class Value = typename rxsc::scheduler::clock_type::time_point::duration>
127  static auto member(Observable&& o)
128  -> decltype(o.template lift<Value>(TimeInterval(identity_current_thread()))) {
129  return o.template lift<Value>(TimeInterval(identity_current_thread()));
130  }
131 
132  template<class Observable, class Coordination,
133  class Enabled = rxu::enable_if_all_true_type_t<
134  is_observable<Observable>,
136  class SourceValue = rxu::value_type_t<Observable>,
137  class TimeInterval = rxo::detail::time_interval<SourceValue, rxu::decay_t<Coordination>>,
138  class Value = typename rxsc::scheduler::clock_type::time_point::duration>
139  static auto member(Observable&& o, Coordination&& cn)
140  -> decltype(o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)))) {
141  return o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)));
142  }
143 
144  template<class... AN>
145  static operators::detail::time_interval_invalid_t<AN...> member(AN...) {
146  std::terminate();
147  return {};
148  static_assert(sizeof...(AN) == 10000, "time_interval takes (optional Coordination)");
149  }
150 };
151 
152 }
153 
154 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
static auto member(Observable &&o) -> decltype(o.template lift< Value >(TimeInterval(identity_current_thread())))
Definition: rx-time_interval.hpp:127
Definition: rx-operators.hpp:459
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto time_interval(AN &&...an) -> operator_factory< time_interval_tag, AN... >
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-time_interval.hpp:111
static auto member(Observable &&o, Coordination &&cn) -> decltype(o.template lift< Value >(TimeInterval(std::forward< Coordination >(cn))))
Definition: rx-time_interval.hpp:139
static operators::detail::time_interval_invalid_t< AN... > member(AN...)
Definition: rx-time_interval.hpp:145
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37