20 #if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP) 21 #define RXCPP_OPERATORS_RX_TIMESTAMP_HPP 23 #include "../rx-includes.hpp" 32 struct timestamp_invalid_arguments {};
35 struct timestamp_invalid :
public rxo::operator_base<timestamp_invalid_arguments<AN...>> {
36 using type = observable<timestamp_invalid_arguments<
AN...>, timestamp_invalid<
AN...>>;
39 using timestamp_invalid_t =
typename timestamp_invalid<
AN...>::type;
41 template<
class T,
class Coordination>
44 typedef rxu::decay_t<T> source_value_type;
45 typedef rxu::decay_t<Coordination> coordination_type;
47 struct timestamp_values {
48 timestamp_values(coordination_type c)
53 coordination_type coordination;
55 timestamp_values initial;
58 : initial(coordination)
62 template<
class Subscriber>
63 struct timestamp_observer
65 typedef timestamp_observer<Subscriber> this_type;
66 typedef source_value_type value_type;
67 typedef rxu::decay_t<Subscriber> dest_type;
68 typedef observer<value_type, this_type> observer_type;
70 coordination_type coord;
72 timestamp_observer(dest_type d, coordination_type coordination)
74 coord(std::move(coordination))
78 void on_next(source_value_type v)
const {
79 dest.on_next(std::make_pair(v, coord.now()));
81 void on_error(std::exception_ptr e)
const {
84 void on_completed()
const {
88 static subscriber<value_type, observer_type> make(dest_type d, timestamp_values v) {
89 return make_subscriber<value_type>(d, this_type(d, v.coordination));
93 template<
class Subscriber>
94 auto operator()(Subscriber dest)
const 95 -> decltype(timestamp_observer<Subscriber>::make(std::move(dest), initial)) {
96 return timestamp_observer<Subscriber>::make(std::move(dest), initial);
104 template<
class...
AN>
107 return operator_factory<timestamp_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
115 template<
class Observable,
119 class Timestamp = rxo::detail::timestamp<SourceValue, identity_one_worker>,
120 class Clock =
typename rxsc::scheduler::clock_type::time_point,
121 class Value = std::pair<SourceValue, Clock>>
127 template<
class Observable,
class Coordination,
129 is_observable<Observable>,
132 class Timestamp = rxo::detail::timestamp<SourceValue, rxu::decay_t<Coordination>>,
133 class Clock =
typename rxsc::scheduler::clock_type::time_point,
134 class Value = std::pair<SourceValue, Clock>>
135 static auto member(Observable&& o, Coordination&& cn)
136 -> decltype(o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)))) {
137 return o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)));
140 template<
class...
AN>
141 static operators::detail::timestamp_invalid_t<
AN...>
member(
AN...) {
144 static_assert(
sizeof...(
AN) == 10000,
"timestamp takes (optional Coordination)");
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o) -> decltype(o.template lift< Value >(Timestamp(identity_current_thread())))
Definition: rx-timestamp.hpp:122
Definition: rx-operators.hpp:466
static operators::detail::timestamp_invalid_t< AN... > member(AN...)
Definition: rx-timestamp.hpp:141
static auto member(Observable &&o, Coordination &&cn) -> decltype(o.template lift< Value >(Timestamp(std::forward< Coordination >(cn))))
Definition: rx-timestamp.hpp:135
auto timestamp(AN &&...an) -> operator_factory< timestamp_tag, AN... >
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-timestamp.hpp:105
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37