22 #if !defined(RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP) 23 #define RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP 25 #include "../rx-includes.hpp" 34 struct sample_with_time_invalid_arguments {};
37 struct sample_with_time_invalid :
public rxo::operator_base<sample_with_time_invalid_arguments<AN...>> {
38 using type = observable<sample_with_time_invalid_arguments<
AN...>, sample_with_time_invalid<
AN...>>;
41 using sample_with_time_invalid_t =
typename sample_with_time_invalid<
AN...>::type;
43 template<
class T,
class Duration,
class Coordination>
46 typedef rxu::decay_t<T> source_value_type;
47 typedef rxu::decay_t<Coordination> coordination_type;
48 typedef typename coordination_type::coordinator_type coordinator_type;
49 typedef rxu::decay_t<Duration> duration_type;
51 struct sample_with_time_value
53 sample_with_time_value(duration_type p, coordination_type c)
59 coordination_type coordination;
61 sample_with_time_value initial;
64 : initial(period, coordination)
68 template<
class Subscriber>
69 struct sample_with_time_observer
71 typedef sample_with_time_observer<Subscriber> this_type;
73 typedef rxu::decay_t<Subscriber> dest_type;
74 typedef observer<value_type, this_type> observer_type;
76 struct sample_with_time_subscriber_value :
public sample_with_time_value
78 sample_with_time_subscriber_value(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
79 : sample_with_time_value(v)
82 , coordinator(std::move(c))
83 , worker(coordinator.get_worker())
86 composite_subscription cs;
88 coordinator_type coordinator;
90 mutable rxu::maybe<value_type> value;
92 std::shared_ptr<sample_with_time_subscriber_value> state;
94 sample_with_time_observer(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
95 : state(std::make_shared<sample_with_time_subscriber_value>(sample_with_time_subscriber_value(std::move(cs), std::move(d), v, std::move(c))))
97 auto localState = state;
99 auto disposer = [=](
const rxsc::schedulable&){
100 localState->cs.unsubscribe();
101 localState->dest.unsubscribe();
102 localState->worker.unsubscribe();
105 [&](){
return localState->coordinator.act(disposer); },
107 if (selectedDisposer.empty()) {
111 localState->dest.add([=](){
112 localState->worker.schedule(selectedDisposer.get());
114 localState->cs.add([=](){
115 localState->worker.schedule(selectedDisposer.get());
118 auto produce_sample = [localState](
const rxsc::schedulable&) {
119 if(!localState->value.empty()) {
120 localState->dest.on_next(*localState->value);
121 localState->value.reset();
125 [&](){
return localState->coordinator.act(produce_sample); },
127 if (selectedProduce.empty()) {
131 state->worker.schedule_periodically(
132 localState->worker.now(),
134 [localState, selectedProduce](
const rxsc::schedulable&) {
135 localState->worker.schedule(selectedProduce.get());
139 void on_next(T v)
const {
140 auto localState = state;
141 auto work = [v, localState](
const rxsc::schedulable&) {
142 localState->value.reset(v);
145 [&](){
return localState->coordinator.act(work); },
147 if (selectedWork.empty()) {
150 localState->worker.schedule(selectedWork.get());
153 void on_error(std::exception_ptr e)
const {
154 auto localState = state;
155 auto work = [e, localState](
const rxsc::schedulable&) {
156 localState->dest.on_error(e);
159 [&](){
return localState->coordinator.act(work); },
161 if (selectedWork.empty()) {
164 localState->worker.schedule(selectedWork.get());
167 void on_completed()
const {
168 auto localState = state;
169 auto work = [localState](
const rxsc::schedulable&) {
170 localState->dest.on_completed();
173 [&](){
return localState->coordinator.act(work); },
175 if (selectedWork.empty()) {
178 localState->worker.schedule(selectedWork.get());
181 static subscriber<T, observer<T, this_type>> make(dest_type d, sample_with_time_value v) {
182 auto cs = composite_subscription();
183 auto coordinator = v.coordination.create_coordinator();
185 return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
189 template<
class Subscriber>
190 auto operator()(Subscriber dest)
const 191 -> decltype(sample_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
192 return sample_with_time_observer<Subscriber>::make(std::move(dest), initial);
200 template<
class...
AN>
203 return operator_factory<sample_with_time_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
211 template<
class Observable,
class Duration,
216 class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>>
217 static auto member(Observable&& o, Duration&& d)
218 -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d),
identity_current_thread()))) {
222 template<
class Observable,
class Coordination,
class Duration,
224 is_observable<Observable>,
226 rxu::is_duration<Duration>>,
229 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
230 -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
231 return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
234 template<
class Observable,
class Coordination,
class Duration,
236 is_observable<Observable>,
237 is_coordination<Coordination>,
238 rxu::is_duration<Duration>>,
241 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
242 -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243 return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
246 template<
class...
AN>
247 static operators::detail::sample_with_time_invalid_t<
AN...>
member(
const AN&...) {
250 static_assert(
sizeof...(
AN) == 10000,
"sample_with_time takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
Definition: rx-util.hpp:791
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:346
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(SampleWithTime(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-sample_time.hpp:229
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static operators::detail::sample_with_time_invalid_t< AN... > member(const AN &...)
Definition: rx-sample_time.hpp:247
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(SampleWithTime(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-sample_time.hpp:217
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(SampleWithTime(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-sample_time.hpp:241
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto sample_with_time(AN &&...an) -> operator_factory< sample_with_time_tag, AN... >
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-sample_time.hpp:201
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37