5 #if !defined(RXCPP_RX_SUBJECT_HPP) 6 #define RXCPP_RX_SUBJECT_HPP 8 #include "../rx-includes.hpp" 17 class multicast_observer
19 typedef subscriber<T> observer_type;
20 typedef std::vector<observer_type> list_type;
34 :
public std::enable_shared_from_this<state_type>
36 explicit state_type(composite_subscription cs)
37 : current(mode::Casting)
42 typename mode::type current;
43 std::exception_ptr
error;
44 composite_subscription lifetime;
48 :
public std::enable_shared_from_this<completer_type>
53 completer_type(std::shared_ptr<state_type> s,
const std::shared_ptr<completer_type>& old, observer_type o)
57 observers.push_back(o);
59 completer_type(std::shared_ptr<state_type> s,
const std::shared_ptr<completer_type>& old)
64 void retain(
const std::shared_ptr<completer_type>& old) {
66 observers.reserve(old->observers.size() + 1);
68 old->observers.begin(), old->observers.end(),
69 std::inserter(observers, observers.end()),
70 [](
const observer_type& o){
71 return o.is_subscribed();
75 std::shared_ptr<state_type> state;
81 :
public std::enable_shared_from_this<binder_type>
83 explicit binder_type(composite_subscription cs)
84 : state(std::make_shared<state_type>(cs))
89 std::shared_ptr<state_type> state;
94 mutable std::weak_ptr<completer_type> current_completer;
97 mutable std::shared_ptr<completer_type> completer;
100 std::shared_ptr<binder_type> b;
103 typedef subscriber<T, observer<T, detail::multicast_observer<T>>> input_subscriber_type;
105 explicit multicast_observer(composite_subscription cs)
106 : b(std::make_shared<binder_type>(cs))
108 std::weak_ptr<binder_type> binder = b;
109 b->state->lifetime.add([binder](){
110 auto b = binder.lock();
111 if (b && b->state->current == mode::Casting){
112 b->state->current = mode::Disposed;
113 b->current_completer.reset();
114 b->completer.reset();
118 trace_id get_id()
const {
121 composite_subscription get_subscription()
const {
122 return b->state->lifetime;
124 input_subscriber_type get_subscriber()
const {
125 return make_subscriber<T>(get_id(), get_subscription(), observer<T, detail::multicast_observer<T>>(*this));
127 bool has_observers()
const {
128 std::unique_lock<std::mutex> guard(b->state->lock);
129 return b->completer && !b->completer->observers.empty();
131 template<
class SubscriberFrom>
132 void add(
const SubscriberFrom& sf, observer_type o)
const {
134 std::unique_lock<std::mutex> guard(b->state->lock);
135 switch (b->state->current) {
138 if (o.is_subscribed()) {
139 std::weak_ptr<binder_type> binder = b;
141 auto b = binder.lock();
143 std::unique_lock<std::mutex> guard(b->state->lock);
144 b->completer = std::make_shared<completer_type>(b->state, b->completer);
147 b->completer = std::make_shared<completer_type>(b->state, b->completer, o);
151 case mode::Completed:
160 auto e = b->state->error;
178 void on_next(V v)
const {
179 auto current_completer = b->current_completer.lock();
180 if (!current_completer) {
181 std::unique_lock<std::mutex> guard(b->state->lock);
182 b->current_completer = b->completer;
183 current_completer = b->current_completer.lock();
185 if (!current_completer || current_completer->observers.empty()) {
188 for (
auto& o : current_completer->observers) {
189 if (o.is_subscribed()) {
194 void on_error(std::exception_ptr e)
const {
195 std::unique_lock<std::mutex> guard(b->state->lock);
196 if (b->state->current == mode::Casting) {
198 b->state->current = mode::Errored;
199 auto s = b->state->lifetime;
200 auto c = std::move(b->completer);
201 b->current_completer.reset();
204 for (
auto& o : c->observers) {
205 if (o.is_subscribed()) {
213 void on_completed()
const {
214 std::unique_lock<std::mutex> guard(b->state->lock);
215 if (b->state->current == mode::Casting) {
216 b->state->current = mode::Completed;
217 auto s = b->state->lifetime;
218 auto c = std::move(b->completer);
219 b->current_completer.reset();
222 for (
auto& o : c->observers) {
223 if (o.is_subscribed()) {
239 detail::multicast_observer<T> s;
254 return s.has_observers();
258 return s.get_subscriber();
264 keepAlive.add(keepAlive.get_subscriber(), std::move(o));
observable< T > get_observable() const
Definition: rx-subject.hpp:261
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-subject.hpp:237
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
subject(composite_subscription cs)
Definition: rx-subject.hpp:248
subscriber_type get_subscriber() const
Definition: rx-subject.hpp:257
subject()
Definition: rx-subject.hpp:244
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool has_observers() const
Definition: rx-subject.hpp:253
observable< T > observable_type
Definition: rx-subject.hpp:243
static trace_id make_next_id_subscriber()
Definition: rx-trace.hpp:16
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
subscriber< T, observer< T, detail::multicast_observer< T > > > subscriber_type
Definition: rx-subject.hpp:242