5 #if !defined(RXCPP_RX_BEHAVIOR_HPP) 6 #define RXCPP_RX_BEHAVIOR_HPP 8 #include "../rx-includes.hpp" 17 class behavior_observer :
public detail::multicast_observer<T>
19 typedef behavior_observer<T> this_type;
20 typedef detail::multicast_observer<T> base_type;
22 class behavior_observer_state :
public std::enable_shared_from_this<behavior_observer_state>
24 mutable std::mutex lock;
28 behavior_observer_state(T
first)
33 void reset(T v)
const {
34 std::unique_lock<std::mutex> guard(lock);
38 std::unique_lock<std::mutex> guard(lock);
43 std::shared_ptr<behavior_observer_state> state;
46 behavior_observer(T f, composite_subscription l)
48 , state(std::make_shared<behavior_observer_state>(std::move(f)))
52 subscriber<T> get_subscriber()
const {
53 return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).
as_dynamic();
61 void on_next(V v)
const {
63 base_type::on_next(std::move(v));
72 detail::behavior_observer<T> s;
81 return s.has_observers();
89 return s.get_subscriber();
95 if (keepAlive.get_subscription().is_subscribed()) {
96 o.on_next(get_value());
98 keepAlive.add(s.get_subscriber(), std::move(o));
Definition: rx-behavior.hpp:70
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
T get_value() const
Definition: rx-behavior.hpp:84
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
behavior(T f, composite_subscription cs=composite_subscription())
Definition: rx-behavior.hpp:75
observable< T > get_observable() const
Definition: rx-behavior.hpp:92
bool has_observers() const
Definition: rx-behavior.hpp:80
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
subscriber< T > get_subscriber() const
Definition: rx-behavior.hpp:88