RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-behavior.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_BEHAVIOR_HPP)
6 #define RXCPP_RX_BEHAVIOR_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class T>
17 class behavior_observer : public detail::multicast_observer<T>
18 {
19  typedef behavior_observer<T> this_type;
20  typedef detail::multicast_observer<T> base_type;
21 
22  class behavior_observer_state : public std::enable_shared_from_this<behavior_observer_state>
23  {
24  mutable std::mutex lock;
25  mutable T value;
26 
27  public:
28  behavior_observer_state(T first)
29  : value(first)
30  {
31  }
32 
33  void reset(T v) const {
34  std::unique_lock<std::mutex> guard(lock);
35  value = std::move(v);
36  }
37  T get() const {
38  std::unique_lock<std::mutex> guard(lock);
39  return value;
40  }
41  };
42 
43  std::shared_ptr<behavior_observer_state> state;
44 
45 public:
46  behavior_observer(T f, composite_subscription l)
47  : base_type(l)
48  , state(std::make_shared<behavior_observer_state>(std::move(f)))
49  {
50  }
51 
52  subscriber<T> get_subscriber() const {
53  return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).as_dynamic();
54  }
55 
56  T get_value() const {
57  return state->get();
58  }
59 
60  template<class V>
61  void on_next(V v) const {
62  state->reset(v);
63  base_type::on_next(std::move(v));
64  }
65 };
66 
67 }
68 
69 template<class T>
70 class behavior
71 {
72  detail::behavior_observer<T> s;
73 
74 public:
76  : s(std::move(f), cs)
77  {
78  }
79 
80  bool has_observers() const {
81  return s.has_observers();
82  }
83 
84  T get_value() const {
85  return s.get_value();
86  }
87 
89  return s.get_subscriber();
90  }
91 
93  auto keepAlive = s;
94  return make_observable_dynamic<T>([=](subscriber<T> o){
95  if (keepAlive.get_subscription().is_subscribed()) {
96  o.on_next(get_value());
97  }
98  keepAlive.add(s.get_subscriber(), std::move(o));
99  });
100  }
101 };
102 
103 }
104 
105 }
106 
107 #endif
Definition: rx-behavior.hpp:70
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
T get_value() const
Definition: rx-behavior.hpp:84
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
behavior(T f, composite_subscription cs=composite_subscription())
Definition: rx-behavior.hpp:75
observable< T > get_observable() const
Definition: rx-behavior.hpp:92
bool has_observers() const
Definition: rx-behavior.hpp:80
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
subscriber< T > get_subscriber() const
Definition: rx-behavior.hpp:88