5 #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP) 6 #define RXCPP_RX_REPLAYSUBJECT_HPP 8 #include "../rx-includes.hpp" 16 template<
class Coordination>
19 typedef rxu::maybe<std::size_t> count_type;
20 typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type;
21 typedef rxsc::scheduler::clock_type::time_point time_point_type;
22 typedef rxu::decay_t<Coordination> coordination_type;
23 typedef typename coordination_type::coordinator_type coordinator_type;
26 template<
class T,
class Coordination>
27 class replay_observer :
public detail::multicast_observer<T>
29 typedef replay_observer<T, Coordination> this_type;
30 typedef detail::multicast_observer<T> base_type;
32 typedef replay_traits<Coordination> traits;
33 typedef typename traits::count_type count_type;
34 typedef typename traits::period_type period_type;
35 typedef typename traits::time_point_type time_point_type;
36 typedef typename traits::coordination_type coordination_type;
37 typedef typename traits::coordinator_type coordinator_type;
39 class replay_observer_state :
public std::enable_shared_from_this<replay_observer_state>
41 mutable std::mutex lock;
42 mutable std::list<T> values;
43 mutable std::list<time_point_type> time_points;
44 mutable count_type
count;
45 mutable period_type period;
47 mutable coordination_type coordination;
48 mutable coordinator_type coordinator;
51 void remove_oldest()
const {
53 if (!period.empty()) {
54 time_points.pop_front();
59 explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator)
62 , coordination(std::move(_coordination))
63 , coordinator(std::move(_coordinator))
68 std::unique_lock<std::mutex> guard(lock);
70 if (values.size() == count.get())
74 if (!period.empty()) {
75 auto now = coordination.now();
76 while (!time_points.empty() && (now - time_points.front() > period.get()))
78 time_points.push_back(now);
81 values.push_back(std::move(v));
83 std::list<T>
get()
const {
84 std::unique_lock<std::mutex> guard(lock);
89 std::shared_ptr<replay_observer_state> state;
92 replay_observer(count_type
count, period_type period, coordination_type coordination, composite_subscription cs)
95 auto coordinator = coordination.create_coordinator(cs);
96 state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator));
99 subscriber<T> get_subscriber()
const {
100 return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).
as_dynamic();
103 std::list<T> get_values()
const {
107 coordinator_type& get_coordinator()
const {
108 return state->coordinator;
112 void on_next(V v)
const {
114 base_type::on_next(std::move(v));
120 template<
class T,
class Coordination>
123 typedef detail::replay_traits<Coordination> traits;
124 typedef typename traits::count_type count_type;
125 typedef typename traits::period_type period_type;
126 typedef typename traits::time_point_type time_point_type;
128 detail::replay_observer<T, Coordination> s;
132 : s(count_type(), period_type(), cn, cs)
137 : s(count_type(std::move(count)), period_type(), cn, cs)
142 : s(count_type(), period_type(period), cn, cs)
147 : s(count_type(count), period_type(period), cn, cs)
152 return s.has_observers();
156 return s.get_values();
160 return s.get_subscriber();
166 if (keepAlive.get_subscription().is_subscribed()) {
167 for (
auto&& value: get_values())
170 keepAlive.add(keepAlive.get_subscriber(), std::move(o));
subscriber< T > get_subscriber() const
Definition: rx-replaysubject.hpp:159
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:141
replay(std::size_t count, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:136
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
replay(Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:131
std::list< T > get_values() const
Definition: rx-replaysubject.hpp:155
observable< T > get_observable() const
Definition: rx-replaysubject.hpp:163
bool has_observers() const
Definition: rx-replaysubject.hpp:151
Definition: rx-replaysubject.hpp:121
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:146