RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-sample_time.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
22 #if !defined(RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP)
23 #define RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct sample_with_time_invalid_arguments {};
35 
36 template<class... AN>
37 struct sample_with_time_invalid : public rxo::operator_base<sample_with_time_invalid_arguments<AN...>> {
38  using type = observable<sample_with_time_invalid_arguments<AN...>, sample_with_time_invalid<AN...>>;
39 };
40 template<class... AN>
41 using sample_with_time_invalid_t = typename sample_with_time_invalid<AN...>::type;
42 
43 template<class T, class Duration, class Coordination>
44 struct sample_with_time
45 {
46  typedef rxu::decay_t<T> source_value_type;
47  typedef rxu::decay_t<Coordination> coordination_type;
48  typedef typename coordination_type::coordinator_type coordinator_type;
49  typedef rxu::decay_t<Duration> duration_type;
50 
51  struct sample_with_time_value
52  {
53  sample_with_time_value(duration_type p, coordination_type c)
54  : period(p)
55  , coordination(c)
56  {
57  }
58  duration_type period;
59  coordination_type coordination;
60  };
61  sample_with_time_value initial;
62 
63  sample_with_time(duration_type period, coordination_type coordination)
64  : initial(period, coordination)
65  {
66  }
67 
68  template<class Subscriber>
69  struct sample_with_time_observer
70  {
71  typedef sample_with_time_observer<Subscriber> this_type;
72  typedef T value_type;
73  typedef rxu::decay_t<Subscriber> dest_type;
74  typedef observer<value_type, this_type> observer_type;
75 
76  struct sample_with_time_subscriber_value : public sample_with_time_value
77  {
78  sample_with_time_subscriber_value(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
79  : sample_with_time_value(v)
80  , cs(std::move(cs))
81  , dest(std::move(d))
82  , coordinator(std::move(c))
83  , worker(coordinator.get_worker())
84  {
85  }
86  composite_subscription cs;
87  dest_type dest;
88  coordinator_type coordinator;
89  rxsc::worker worker;
90  mutable rxu::maybe<value_type> value;
91  };
92  std::shared_ptr<sample_with_time_subscriber_value> state;
93 
94  sample_with_time_observer(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
95  : state(std::make_shared<sample_with_time_subscriber_value>(sample_with_time_subscriber_value(std::move(cs), std::move(d), v, std::move(c))))
96  {
97  auto localState = state;
98 
99  auto disposer = [=](const rxsc::schedulable&){
100  localState->cs.unsubscribe();
101  localState->dest.unsubscribe();
102  localState->worker.unsubscribe();
103  };
104  auto selectedDisposer = on_exception(
105  [&](){ return localState->coordinator.act(disposer); },
106  localState->dest);
107  if (selectedDisposer.empty()) {
108  return;
109  }
110 
111  localState->dest.add([=](){
112  localState->worker.schedule(selectedDisposer.get());
113  });
114  localState->cs.add([=](){
115  localState->worker.schedule(selectedDisposer.get());
116  });
117 
118  auto produce_sample = [localState](const rxsc::schedulable&) {
119  if(!localState->value.empty()) {
120  localState->dest.on_next(*localState->value);
121  localState->value.reset();
122  }
123  };
124  auto selectedProduce = on_exception(
125  [&](){ return localState->coordinator.act(produce_sample); },
126  localState->dest);
127  if (selectedProduce.empty()) {
128  return;
129  }
130 
131  state->worker.schedule_periodically(
132  localState->worker.now(),
133  localState->period,
134  [localState, selectedProduce](const rxsc::schedulable&) {
135  localState->worker.schedule(selectedProduce.get());
136  });
137  }
138 
139  void on_next(T v) const {
140  auto localState = state;
141  auto work = [v, localState](const rxsc::schedulable&) {
142  localState->value.reset(v);
143  };
144  auto selectedWork = on_exception(
145  [&](){ return localState->coordinator.act(work); },
146  localState->dest);
147  if (selectedWork.empty()) {
148  return;
149  }
150  localState->worker.schedule(selectedWork.get());
151  }
152 
153  void on_error(std::exception_ptr e) const {
154  auto localState = state;
155  auto work = [e, localState](const rxsc::schedulable&) {
156  localState->dest.on_error(e);
157  };
158  auto selectedWork = on_exception(
159  [&](){ return localState->coordinator.act(work); },
160  localState->dest);
161  if (selectedWork.empty()) {
162  return;
163  }
164  localState->worker.schedule(selectedWork.get());
165  }
166 
167  void on_completed() const {
168  auto localState = state;
169  auto work = [localState](const rxsc::schedulable&) {
170  localState->dest.on_completed();
171  };
172  auto selectedWork = on_exception(
173  [&](){ return localState->coordinator.act(work); },
174  localState->dest);
175  if (selectedWork.empty()) {
176  return;
177  }
178  localState->worker.schedule(selectedWork.get());
179  }
180 
181  static subscriber<T, observer<T, this_type>> make(dest_type d, sample_with_time_value v) {
182  auto cs = composite_subscription();
183  auto coordinator = v.coordination.create_coordinator();
184 
185  return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
186  }
187  };
188 
189  template<class Subscriber>
190  auto operator()(Subscriber dest) const
191  -> decltype(sample_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
192  return sample_with_time_observer<Subscriber>::make(std::move(dest), initial);
193  }
194 };
195 
196 }
197 
200 template<class... AN>
201 auto sample_with_time(AN&&... an)
203  return operator_factory<sample_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
204 }
205 
206 }
207 
208 template<>
210 {
211  template<class Observable, class Duration,
212  class Enabled = rxu::enable_if_all_true_type_t<
215  class SourceValue = rxu::value_type_t<Observable>,
216  class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
217  static auto member(Observable&& o, Duration&& d)
218  -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), identity_current_thread()))) {
219  return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), identity_current_thread()));
220  }
221 
222  template<class Observable, class Coordination, class Duration,
223  class Enabled = rxu::enable_if_all_true_type_t<
224  is_observable<Observable>,
226  rxu::is_duration<Duration>>,
227  class SourceValue = rxu::value_type_t<Observable>,
228  class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
229  static auto member(Observable&& o, Coordination&& cn, Duration&& d)
230  -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
231  return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
232  }
233 
234  template<class Observable, class Coordination, class Duration,
235  class Enabled = rxu::enable_if_all_true_type_t<
236  is_observable<Observable>,
237  is_coordination<Coordination>,
238  rxu::is_duration<Duration>>,
239  class SourceValue = rxu::value_type_t<Observable>,
240  class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
241  static auto member(Observable&& o, Duration&& d, Coordination&& cn)
242  -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243  return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
244  }
245 
246  template<class... AN>
247  static operators::detail::sample_with_time_invalid_t<AN...> member(const AN&...) {
248  std::terminate();
249  return {};
250  static_assert(sizeof...(AN) == 10000, "sample_with_time takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
251  }
252 };
253 
254 }
255 
256 #endif
Definition: rx-util.hpp:791
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:346
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(SampleWithTime(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-sample_time.hpp:229
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static operators::detail::sample_with_time_invalid_t< AN... > member(const AN &...)
Definition: rx-sample_time.hpp:247
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(SampleWithTime(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-sample_time.hpp:217
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(SampleWithTime(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-sample_time.hpp:241
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto sample_with_time(AN &&...an) -> operator_factory< sample_with_time_tag, AN... >
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-sample_time.hpp:201
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37