15 #if !defined(RXCPP_RX_COROUTINE_HPP) 16 #define RXCPP_RX_COROUTINE_HPP 20 #ifdef _RESUMABLE_FUNCTIONS_SUPPORTED 24 #include <experimental/resumable> 30 using namespace std::chrono;
31 using namespace std::experimental;
33 template<
typename Source>
34 struct co_observable_iterator;
36 template<
typename Source>
37 struct co_observable_iterator_state : std::enable_shared_from_this<co_observable_iterator_state<Source>>
39 using value_type =
typename Source::value_type;
41 ~co_observable_iterator_state() {
42 lifetime.unsubscribe();
44 explicit co_observable_iterator_state(
const Source& o) : o(o) {}
46 coroutine_handle<> caller{};
47 composite_subscription lifetime{};
48 const value_type* value{
nullptr};
49 exception_ptr
error{
nullptr};
53 template<
typename Source>
54 struct co_observable_inc_awaiter
60 bool await_suspend(coroutine_handle<> handle) {
61 if (!state->lifetime.is_subscribed()) {
return false;}
62 state->caller = handle;
66 co_observable_iterator<Source> await_resume();
68 shared_ptr<co_observable_iterator_state<Source>> state;
71 template<
typename Source>
72 struct co_observable_iterator :
public iterator<input_iterator_tag, typename Source::value_type>
74 using value_type =
typename Source::value_type;
76 co_observable_iterator() {}
78 explicit co_observable_iterator(
const Source& o) : state(make_shared<co_observable_iterator_state<Source>>(o)) {}
79 explicit co_observable_iterator(
const shared_ptr<co_observable_iterator_state<Source>>& o) : state(o) {}
81 co_observable_iterator(co_observable_iterator&&)=
default;
82 co_observable_iterator& operator=(co_observable_iterator&&)=
default;
84 co_observable_inc_awaiter<Source> operator++()
86 return co_observable_inc_awaiter<Source>{state};
89 co_observable_iterator& operator++(
int) =
delete;
92 bool operator==(co_observable_iterator
const &rhs)
const 94 return !!state && !rhs.state && !state->lifetime.is_subscribed();
97 bool operator!=(co_observable_iterator
const &rhs)
const 99 return !(*
this == rhs);
102 value_type
const &operator*()
const 104 return *(state->value);
107 value_type
const *operator->()
const 109 return std::addressof(
operator*());
112 shared_ptr<co_observable_iterator_state<Source>> state;
115 template<
typename Source>
116 co_observable_iterator<Source> co_observable_inc_awaiter<Source>::await_resume() {
117 if (!!state->error) {rethrow_exception(state->error);}
118 return co_observable_iterator<Source>{state};
121 template<
typename Source>
122 struct co_observable_iterator_awaiter
124 using iterator=co_observable_iterator<Source>;
125 using value_type=
typename iterator::value_type;
127 explicit co_observable_iterator_awaiter(
const Source& o) : it(o) {
134 void await_suspend(coroutine_handle<> handle) {
135 weak_ptr<co_observable_iterator_state<Source>> wst=it.state;
136 it.state->caller = handle;
139 auto st = wst.lock();
140 if (st && !!st->caller) {
141 auto caller = st->caller;
142 st->caller =
nullptr;
146 rxo::subscribe<value_type>(
149 [wst](
const value_type& v){
150 auto st = wst.lock();
151 if (!st || !st->caller) {terminate();}
152 st->value = addressof(v);
153 auto caller = st->caller;
154 st->caller =
nullptr;
158 [wst](exception_ptr e){
159 auto st = wst.lock();
160 if (!st || !st->caller) {terminate();}
162 auto caller = st->caller;
163 st->caller =
nullptr;
168 iterator await_resume() {
169 if (!!it.state->error) {rethrow_exception(it.state->error);}
170 return std::move(it);
182 template<
typename T,
typename SourceOperator>
184 -> rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>> {
185 return rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>>{o};
188 template<
typename T,
typename SourceOperator>
190 -> rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>> {
191 return rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>>{};
Definition: rx-all.hpp:26
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Add a new action at the end of the new observable that is returned.
auto finally(AN &&...an) -> operator_factory< final ly_tag
Add a new action at the end of the new observable that is returned.