RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-error.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_ERROR_HPP)
6 #define RXCPP_SOURCES_RX_ERROR_HPP
7 
8 #include "../rx-includes.hpp"
9 
32 namespace rxcpp {
33 
34 namespace sources {
35 
36 namespace detail {
37 
38 template<class T, class Coordination>
39 struct error : public source_base<T>
40 {
41  typedef error<T, Coordination> this_type;
42 
43  typedef rxu::decay_t<Coordination> coordination_type;
44 
45  typedef typename coordination_type::coordinator_type coordinator_type;
46 
47  struct error_initial_type
48  {
49  error_initial_type(std::exception_ptr e, coordination_type cn)
50  : exception(e)
51  , coordination(std::move(cn))
52  {
53  }
54  std::exception_ptr exception;
55  coordination_type coordination;
56  };
57  error_initial_type initial;
58 
59  error(std::exception_ptr e, coordination_type cn)
60  : initial(e, std::move(cn))
61  {
62  }
63 
64  template<class Subscriber>
65  void on_subscribe(Subscriber o) const {
66 
67  // creates a worker whose lifetime is the same as this subscription
68  auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
69  auto controller = coordinator.get_worker();
70  auto exception = initial.exception;
71 
72  auto producer = [=](const rxsc::schedulable&){
73  auto& dest = o;
74  if (!dest.is_subscribed()) {
75  // terminate loop
76  return;
77  }
78 
79  dest.on_error(exception);
80  // o is unsubscribed
81  };
82  auto selectedProducer = on_exception(
83  [&](){return coordinator.act(producer);},
84  o);
85  if (selectedProducer.empty()) {
86  return;
87  }
88  controller.schedule(selectedProducer.get());
89  }
90 };
91 
92 struct throw_ptr_tag{};
93 struct throw_instance_tag{};
94 
95 template <class T, class Coordination>
96 auto make_error(throw_ptr_tag&&, std::exception_ptr exception, Coordination cn)
97  -> observable<T, error<T, Coordination>> {
98  return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
99 }
100 
101 template <class T, class E, class Coordination>
102 auto make_error(throw_instance_tag&&, E e, Coordination cn)
103  -> observable<T, error<T, Coordination>> {
104  std::exception_ptr exception;
105  try {throw e;} catch(...) {exception = std::current_exception();}
106  return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
107 }
108 
109 }
110 
113 template<class T, class E>
114 auto error(E e)
115  -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
116  return detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
117 }
120 template<class T, class E, class Coordination>
121 auto error(E e, Coordination cn)
122  -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
123  return detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
124 }
125 
126 }
127 
128 }
129 
130 #endif
Definition: rx-all.hpp:26
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639