RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-replaysubject.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP)
6 #define RXCPP_RX_REPLAYSUBJECT_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class Coordination>
17 struct replay_traits
18 {
19  typedef rxu::maybe<std::size_t> count_type;
20  typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type;
21  typedef rxsc::scheduler::clock_type::time_point time_point_type;
22  typedef rxu::decay_t<Coordination> coordination_type;
23  typedef typename coordination_type::coordinator_type coordinator_type;
24 };
25 
26 template<class T, class Coordination>
27 class replay_observer : public detail::multicast_observer<T>
28 {
29  typedef replay_observer<T, Coordination> this_type;
30  typedef detail::multicast_observer<T> base_type;
31 
32  typedef replay_traits<Coordination> traits;
33  typedef typename traits::count_type count_type;
34  typedef typename traits::period_type period_type;
35  typedef typename traits::time_point_type time_point_type;
36  typedef typename traits::coordination_type coordination_type;
37  typedef typename traits::coordinator_type coordinator_type;
38 
39  class replay_observer_state : public std::enable_shared_from_this<replay_observer_state>
40  {
41  mutable std::mutex lock;
42  mutable std::list<T> values;
43  mutable std::list<time_point_type> time_points;
44  mutable count_type count;
45  mutable period_type period;
46  public:
47  mutable coordination_type coordination;
48  mutable coordinator_type coordinator;
49 
50  private:
51  void remove_oldest() const {
52  values.pop_front();
53  if (!period.empty()) {
54  time_points.pop_front();
55  }
56  }
57 
58  public:
59  explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator)
60  : count(_count)
61  , period(_period)
62  , coordination(std::move(_coordination))
63  , coordinator(std::move(_coordinator))
64  {
65  }
66 
67  void add(T v) const {
68  std::unique_lock<std::mutex> guard(lock);
69  if (!count.empty()) {
70  if (values.size() == count.get())
71  remove_oldest();
72  }
73 
74  if (!period.empty()) {
75  auto now = coordination.now();
76  while (!time_points.empty() && (now - time_points.front() > period.get()))
77  remove_oldest();
78  time_points.push_back(now);
79  }
80 
81  values.push_back(std::move(v));
82  }
83  std::list<T> get() const {
84  std::unique_lock<std::mutex> guard(lock);
85  return values;
86  }
87  };
88 
89  std::shared_ptr<replay_observer_state> state;
90 
91 public:
92  replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs)
93  : base_type(cs)
94  {
95  auto coordinator = coordination.create_coordinator(cs);
96  state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator));
97  }
98 
99  subscriber<T> get_subscriber() const {
100  return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).as_dynamic();
101  }
102 
103  std::list<T> get_values() const {
104  return state->get();
105  }
106 
107  coordinator_type& get_coordinator() const {
108  return state->coordinator;
109  }
110 
111  template<class V>
112  void on_next(V v) const {
113  state->add(v);
114  base_type::on_next(std::move(v));
115  }
116 };
117 
118 }
119 
120 template<class T, class Coordination>
121 class replay
122 {
123  typedef detail::replay_traits<Coordination> traits;
124  typedef typename traits::count_type count_type;
125  typedef typename traits::period_type period_type;
126  typedef typename traits::time_point_type time_point_type;
127 
128  detail::replay_observer<T, Coordination> s;
129 
130 public:
131  explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
132  : s(count_type(), period_type(), cn, cs)
133  {
134  }
135 
136  replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
137  : s(count_type(std::move(count)), period_type(), cn, cs)
138  {
139  }
140 
141  replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
142  : s(count_type(), period_type(period), cn, cs)
143  {
144  }
145 
146  replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
147  : s(count_type(count), period_type(period), cn, cs)
148  {
149  }
150 
151  bool has_observers() const {
152  return s.has_observers();
153  }
154 
155  std::list<T> get_values() const {
156  return s.get_values();
157  }
158 
160  return s.get_subscriber();
161  }
162 
164  auto keepAlive = s;
165  auto observable = make_observable_dynamic<T>([=](subscriber<T> o){
166  if (keepAlive.get_subscription().is_subscribed()) {
167  for (auto&& value: get_values())
168  o.on_next(value);
169  }
170  keepAlive.add(keepAlive.get_subscriber(), std::move(o));
171  });
172  return s.get_coordinator().in(observable);
173  }
174 };
175 
176 }
177 
178 }
179 
180 #endif
subscriber< T > get_subscriber() const
Definition: rx-replaysubject.hpp:159
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:141
replay(std::size_t count, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:136
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
replay(Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:131
std::list< T > get_values() const
Definition: rx-replaysubject.hpp:155
observable< T > get_observable() const
Definition: rx-replaysubject.hpp:163
bool has_observers() const
Definition: rx-replaysubject.hpp:151
Definition: rx-replaysubject.hpp:121
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:146