5 #if !defined(RXCPP_RX_NOTIFICATION_HPP) 6 #define RXCPP_RX_NOTIFICATION_HPP 12 namespace notifications {
21 : s(s), u(std::numeric_limits<long>::
max()) {
46 struct notification_base
47 :
public std::enable_shared_from_this<notification_base<T>>
50 typedef std::shared_ptr<notification_base<T>> type;
52 virtual ~notification_base() {}
54 virtual void out(std::ostream& out)
const =0;
55 virtual bool equals(
const type& other)
const = 0;
56 virtual void accept(
const observer_type& o)
const =0;
60 std::ostream& operator<< (std::ostream& out, const std::vector<T>& v);
63 auto to_stream(std::ostream& os,
const T& t,
int,
int)
64 -> decltype(os << t) {
70 std::ostream& to_stream(std::ostream& os,
const T&,
int, ...) {
71 return os <<
"< " <<
typeid(T).name() <<
" does not support ostream>";
76 std::ostream& to_stream(std::ostream& os,
const T&, ...) {
77 return os <<
"<the value does not support ostream>";
81 inline std::ostream& ostreamvector (std::ostream& os,
const std::vector<T>& v) {
90 to_stream(os, i, 0, 0);
97 inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) {
98 return ostreamvector(os, v);
102 auto equals(
const T& lhs,
const T& rhs,
int)
103 -> decltype(
bool(lhs == rhs)) {
108 bool equals(
const T&,
const T&, ...) {
109 throw std::runtime_error(
"value does not support equality tests");
118 typedef typename detail::notification_base<T>::type
type;
122 typedef detail::notification_base<T> base;
124 struct on_next_notification :
public base {
125 on_next_notification(T value) : value(std::move(value)) {
127 on_next_notification(
const on_next_notification& o) : value(o.value) {}
128 on_next_notification(
const on_next_notification&& o) : value(std::move(o.value)) {}
129 on_next_notification& operator=(on_next_notification o) { value = std::move(o.value);
return *
this; }
130 virtual void out(std::ostream& os)
const {
132 detail::to_stream(os, value, 0, 0);
135 virtual bool equals(
const typename base::type& other)
const {
137 other->accept(make_subscriber<T>(make_observer_dynamic<T>([
this, &result](T v) {
138 result = detail::equals(this->value, v, 0);
148 struct on_error_notification :
public base {
149 on_error_notification(std::exception_ptr ep) : ep(ep) {
151 on_error_notification(
const on_error_notification& o) : ep(o.ep) {}
152 on_error_notification(
const on_error_notification&& o) : ep(std::move(o.ep)) {}
153 on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep);
return *
this; }
154 virtual void out(std::ostream& os)
const {
157 std::rethrow_exception(ep);
158 }
catch (
const std::exception& e) {
161 os <<
"<not derived from std::exception>";
165 virtual bool equals(
const typename base::type& other)
const {
168 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](std::exception_ptr){
176 const std::exception_ptr ep;
179 struct on_completed_notification :
public base {
180 on_completed_notification() {
182 virtual void out(std::ostream& os)
const {
183 os <<
"on_completed()";
185 virtual bool equals(
const typename base::type& other)
const {
187 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
197 struct exception_tag {};
199 template<
typename Exception>
201 type make_on_error(exception_tag&&, Exception&& e) {
202 std::exception_ptr ep;
204 throw std::forward<Exception>(e);
207 ep = std::current_exception();
209 return std::make_shared<on_error_notification>(ep);
212 struct exception_ptr_tag {};
215 type make_on_error(exception_ptr_tag&&, std::exception_ptr ep) {
216 return std::make_shared<on_error_notification>(ep);
222 return std::make_shared<on_next_notification>(std::move(value));
226 return std::make_shared<on_completed_notification>();
229 template<
typename Exception>
231 return make_on_error(
typename std::conditional<
233 exception_ptr_tag, exception_tag>::type(),
234 std::forward<Exception>(e));
239 bool operator == (
const std::shared_ptr<detail::notification_base<T>>& lhs,
const std::shared_ptr<detail::notification_base<T>>& rhs) {
240 if (!lhs && !rhs) {
return true;}
241 if (!lhs || !rhs) {
return false;}
242 return lhs->equals(rhs);
246 std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) {
275 std::ostream& operator<< (std::ostream& out, const recorded<T>& r) {
276 out <<
"@" << r.time() <<
"-" << r.value();
281 namespace rxn=notifications;
285 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) {
286 return rxcpp::notifications::detail::ostreamvector(out, vs);
289 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) {
290 return rxcpp::notifications::detail::ostreamvector(out, vr);
Definition: rx-notification.hpp:253
long time() const
Definition: rx-notification.hpp:261
Definition: rx-all.hpp:26
const T & value() const
Definition: rx-notification.hpp:264
bool operator==(subscription lhs, subscription rhs)
Definition: rx-notification.hpp:34
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static type on_completed()
Definition: rx-notification.hpp:225
Definition: rx-notification.hpp:116
subscription(long s)
Definition: rx-notification.hpp:20
void on_error(std::exception_ptr e) const
Definition: rx-subscriber.hpp:183
static type on_error(Exception &&e)
Definition: rx-notification.hpp:230
subscription(long s, long u)
Definition: rx-notification.hpp:23
static type on_next(U value)
Definition: rx-notification.hpp:221
void on_completed() const
Definition: rx-subscriber.hpp:190
Definition: rx-notification.hpp:14
detail::notification_base< T >::type type
Definition: rx-notification.hpp:118
detail::notification_base< T >::observer_type observer_type
Definition: rx-notification.hpp:119
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
std::ostream & operator<<(std::ostream &out, const subscription &s)
Definition: rx-notification.hpp:38
recorded(long t, T v)
Definition: rx-notification.hpp:258
long subscribe() const
Definition: rx-notification.hpp:26
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
long unsubscribe() const
Definition: rx-notification.hpp:29