5 #if !defined(RXCPP_RX_OBSERVABLE_HPP) 6 #define RXCPP_RX_OBSERVABLE_HPP 11 #define EXPLICIT_THIS this-> 20 template<
class Subscriber,
class T>
21 struct has_on_subscribe_for
24 template<
class CS,
class CT>
25 static auto check(
int) -> decltype((*(CT*)
nullptr).on_subscribe(*(CS*)
nullptr));
26 template<
class CS,
class CT>
27 static not_void check(...);
29 typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30 static const bool value = std::is_same<detail_result, void>::value;
40 :
public std::enable_shared_from_this<state_type>
42 typedef std::function<void(subscriber<T>)> onsubscribe_type;
44 onsubscribe_type on_subscribe;
46 std::shared_ptr<state_type> state;
55 so.on_subscribe(std::move(o));
59 struct tag_function {};
61 void construct(F&& f, tag_function&&) {
62 state->on_subscribe = std::forward<F>(f);
75 : state(std::make_shared<state_type>())
77 construct(std::forward<SOF>(sof),
82 state->on_subscribe(std::move(o));
85 template<
class Subscriber>
86 typename std::enable_if<is_subscriber<Subscriber>::value,
void>::type
88 state->on_subscribe(o.as_dynamic());
94 return lhs.state == rhs.state;
101 template<
class T,
class Source>
107 template<
bool Selector,
class Default,
class SO>
108 struct resolve_observable;
110 template<
class Default,
class SO>
111 struct resolve_observable<true, Default, SO>
113 typedef typename SO::type type;
114 typedef typename type::value_type value_type;
115 static const bool value =
true;
117 template<
class...
AN>
118 static observable_type make(
const Default&,
AN&&... an) {
119 return observable_type(type(std::forward<AN>(an)...));
122 template<
class Default,
class SO>
123 struct resolve_observable<false, Default, SO>
125 static const bool value =
false;
126 typedef Default observable_type;
127 template<
class...
AN>
128 static observable_type make(
const observable_type& that,
const AN&...) {
133 struct resolve_observable<true, void, SO>
135 typedef typename SO::type type;
136 typedef typename type::value_type value_type;
137 static const bool value =
true;
139 template<
class...
AN>
140 static observable_type make(
AN&&... an) {
141 return observable_type(type(std::forward<AN>(an)...));
145 struct resolve_observable<false, void, SO>
147 static const bool value =
false;
148 typedef void observable_type;
149 template<
class...
AN>
150 static observable_type make(
const AN&...) {
156 template<
class Selector,
class Default,
template<
class... TN>
class SO, class... AN>
158 :
public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
168 template<
class T,
class Observable>
171 template<
class Obsvbl,
class... ArgN>
172 static auto blocking_subscribe(
const Obsvbl& source,
bool do_rethrow, ArgN&&... an)
175 std::condition_variable wake;
176 std::exception_ptr
error;
182 if (!disposed || !wakened) std::terminate();
191 std::atomic_bool disposed;
192 std::atomic_bool wakened;
193 std::atomic_int false_wakes;
194 std::atomic_int true_wakes;
196 auto track = std::make_shared<tracking>();
198 auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
201 auto scbr = make_subscriber<T>(
203 [&](T t){dest.on_next(t);},
204 [&](std::exception_ptr e){
211 [&](){dest.on_completed();}
214 auto cs = scbr.get_subscription();
221 track->disposed =
true;
224 std::unique_lock<std::mutex> guard(lock);
225 source.subscribe(std::move(scbr));
233 while (!track->disposed) {
234 ++track->false_wakes;
239 track->wakened =
true;
240 if (!track->disposed || !track->wakened) std::terminate();
242 if (error) {std::rethrow_exception(error);}
268 template<
class... ArgN>
271 return blocking_subscribe(source,
false, std::forward<ArgN>(an)...);
293 template<
class... ArgN>
296 return blocking_subscribe(source,
true, std::forward<ArgN>(an)...);
314 template<
class... AN>
316 rxu::maybe<T> result;
318 subscribe_with_rethrow(
324 static_assert(
sizeof...(AN) == 0,
"first() was passed too many arguments.");
342 template<
class... AN>
344 rxu::maybe<T> result;
345 subscribe_with_rethrow(
346 [&](T v){result.reset(v);});
350 static_assert(
sizeof...(AN) == 0,
"last() was passed too many arguments.");
367 source.count().as_blocking().subscribe_with_rethrow(
368 [&](
int v){result = v;});
390 return source.sum().as_blocking().last();
411 return source.average().as_blocking().last();
432 return source.max().as_blocking().last();
453 return source.min().as_blocking().last();
459 template<
class SourceOperator,
class Subscriber>
460 struct safe_subscriber
462 safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
466 so->on_subscribe(*o);
469 if (!o->is_subscribed()) {
472 o->on_error(std::current_exception());
509 template<
class T,
class SourceOperator>
513 static_assert(std::is_same<T, typename SourceOperator::value_type>::value,
"SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
523 template<
class U,
class SO>
526 template<
class U,
class SO>
529 template<
class Subscriber>
530 auto detail_subscribe(Subscriber o)
const 536 static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value,
"the value types in the sequence must match or be convertible");
537 static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value,
"inner must have on_subscribe method that accepts this subscriber ");
541 if (!o.is_subscribed()) {
543 return o.get_subscription();
546 detail::safe_subscriber<source_operator_type, subscriber_type>
subscriber(source_operator, o);
549 if (rxsc::current_thread::is_schedule_required()) {
551 sc.create_worker(o.get_subscription()).schedule(
subscriber);
558 return o.get_subscription();
579 : source_operator(std::move(o))
586 : source_operator(o.source_operator)
591 : source_operator(std::move(o.source_operator))
597 source_operator.on_subscribe(o);
603 template<
class... AN>
606 static_assert(
sizeof...(AN) == 0,
"as_dynamic() was passed too many arguments.");
611 template<
class... AN>
614 static_assert(
sizeof...(AN) == 0,
"as_blocking() was passed too many arguments.");
624 template<
class OperatorFactory>
625 auto op(OperatorFactory&& of)
const 626 -> decltype(of(*(
const this_type*)
nullptr)) {
633 template<
class ResultType,
class Operator>
634 auto lift(Operator&& op)
const 637 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
638 static_assert(detail::is_lift_function_for<T,
subscriber<ResultType>, Operator>::value,
"Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
646 template<
class ResultType,
class Operator>
647 auto lift_if(Operator&& op)
const 648 ->
typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
650 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
651 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
658 template<
class ResultType,
class Operator>
659 auto lift_if(Operator&&)
const 660 ->
typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
661 decltype(rxs::from<ResultType>())>::type {
662 return rxs::from<ResultType>();
668 template<
class... ArgN>
671 return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
676 template<
class... AN>
677 auto all(AN&&... an) const
687 template<
class... AN>
698 template<
class... AN>
699 auto any(AN&&... an) const
709 template<
class... AN>
720 template<
class... AN>
731 template<
class... AN>
742 template<
class... AN>
753 template<
class... AN>
764 template<
class... AN>
775 template<
class... AN>
776 auto tap(AN&&... an) const
786 template<
class... AN>
797 template<
class... AN>
808 template<
class... AN>
819 template<
class... AN>
820 auto finally(AN&&... an)
const 830 template<
class... AN>
841 template<
class... AN>
852 template<
class... AN>
853 auto map(AN&&... an) const
863 template<
class... AN>
874 template<
class... AN>
885 template<
class... AN>
896 template<
class... AN>
907 template<
class... AN>
918 template<
class... AN>
929 template<
class... AN>
940 template<
class... AN>
951 template<
class... AN>
962 template<
class... AN>
973 template<
class... AN>
984 template<
class... AN>
995 template<
class... AN>
1006 template<
class... AN>
1017 template<
class... AN>
1028 template<
class... AN>
1039 template<
class... AN>
1050 template<
class... AN>
1061 template<
class... AN>
1072 template<
class... AN>
1083 template<
class... AN>
1094 template<
class... AN>
1106 template<
class... AN>
1117 template<
class... AN>
1128 template<
class... AN>
1139 template<
class... AN>
1150 template<
class... AN>
1161 template<
class... AN>
1172 template<
class... AN>
1183 template<
class... AN>
1194 template<
class... AN>
1205 template<
class... AN>
1216 template<
class... AN>
1227 template<
class... AN>
1238 template<
class... AN>
1245 static_assert(
sizeof...(AN) == 0,
"first() was passed too many arguments.");
1250 template<
class... AN>
1257 static_assert(
sizeof...(AN) == 0,
"last() was passed too many arguments.");
1262 template<
class... AN>
1269 static_assert(
sizeof...(AN) == 0,
"count() was passed too many arguments.");
1274 template<
class... AN>
1281 static_assert(
sizeof...(AN) == 0,
"sum() was passed too many arguments.");
1286 template<
class... AN>
1293 static_assert(
sizeof...(AN) == 0,
"average() was passed too many arguments.");
1298 template<
class... AN>
1305 static_assert(
sizeof...(AN) == 0,
"max() was passed too many arguments.");
1310 template<
class... AN>
1317 static_assert(
sizeof...(AN) == 0,
"min() was passed too many arguments.");
1322 template<
class... AN>
1333 template<
class... AN>
1344 template<
class... AN>
1355 template<
class... AN>
1366 template<
class... AN>
1377 template<
class... AN>
1388 template<
class... AN>
1399 template<
class... AN>
1410 template<
class... AN>
1421 template<
class... AN>
1432 template<
class... AN>
1443 template<
class... AN>
1454 template<
class... AN>
1464 template<
class T,
class SourceOperator>
1468 template<
class T,
class SourceOperator>
1470 return !(lhs == rhs);
1562 template<
class T,
class OnSubscribe>
1564 -> decltype(rxs::create<T>(std::move(os))) {
1565 return rxs::create<T>(std::move(os));
1577 template<
class T,
class Coordination>
1579 -> decltype(rxs::range<T>(
first,
last, step, std::move(cn))) {
1580 return rxs::range<T>(
first,
last, step, std::move(cn));
1584 template<
class T,
class Coordination>
1586 -> decltype(rxs::range<T>(
first,
last, std::move(cn))) {
1587 return rxs::range<T>(
first,
last, std::move(cn));
1591 template<
class T,
class Coordination>
1593 -> decltype(rxs::range<T>(
first, std::move(cn))) {
1594 return rxs::range<T>(
first, std::move(cn));
1601 -> decltype(rxs::never<T>()) {
1602 return rxs::never<T>();
1607 template<
class ObservableFactory>
1615 template<
class... AN>
1616 static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1619 static_assert(
sizeof...(AN) == 0,
"interval(period) was passed too many arguments.");
1623 template<
class Coordination>
1624 static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1630 template<
class... AN>
1631 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1634 static_assert(
sizeof...(AN) == 0,
"interval(initial, period) was passed too many arguments.");
1638 template<
class Coordination>
1639 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1640 -> decltype(
rxs::interval(initial, period, std::move(cn))) {
1646 template<
class... AN>
1647 static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1650 static_assert(
sizeof...(AN) == 0,
"timer(at) was passed too many arguments.");
1654 template<
class... AN>
1655 static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1658 static_assert(
sizeof...(AN) == 0,
"timer(after) was passed too many arguments.");
1662 template<
class Coordination>
1663 static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1664 -> decltype(
rxs::timer(when, std::move(cn))) {
1669 template<
class Coordination>
1670 static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1671 -> decltype(
rxs::timer(when, std::move(cn))) {
1677 template<
class Collection>
1684 template<
class Collection,
class Coordination>
1685 static auto iterate(Collection c, Coordination cn)
1686 -> decltype(
rxs::iterate(std::move(c), std::move(cn))) {
1694 -> decltype( rxs::from<T>()) {
1695 return rxs::from<T>();
1699 template<
class T,
class Coordination>
1701 ->
typename std::enable_if<is_coordination<Coordination>::value,
1702 decltype( rxs::from<T>(std::move(cn)))>::type {
1703 return rxs::from<T>(std::move(cn));
1707 template<
class Value0,
class... ValueN>
1708 static auto from(Value0 v0, ValueN... vn)
1709 ->
typename std::enable_if<!is_coordination<Value0>::value,
1710 decltype(
rxs::from(v0, vn...))>::type {
1715 template<
class Coordination,
class Value0,
class... ValueN>
1716 static auto from(Coordination cn, Value0 v0, ValueN... vn)
1717 ->
typename std::enable_if<is_coordination<Coordination>::value,
1718 decltype(
rxs::from(std::move(cn), v0, vn...))>::type {
1719 return rxs::from(std::move(cn), v0, vn...);
1731 template<
class T,
class Coordination>
1732 static auto just(T v, Coordination cn)
1733 -> decltype(
rxs::just(std::move(v), std::move(cn))) {
1734 return rxs::just(std::move(v), std::move(cn));
1739 template<
class Observable,
class Value0,
class... ValueN>
1741 -> decltype(
rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1742 return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1749 -> decltype(from<T>()) {
1754 template<
class T,
class Coordination>
1756 -> decltype(from<T>(std::move(cn))) {
1757 return from<T>(std::move(cn));
1762 template<
class T,
class Exception>
1764 -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1765 return rxs::error<T>(std::forward<Exception>(e));
1769 template<
class T,
class Exception,
class Coordination>
1770 static auto error(Exception&& e, Coordination cn)
1771 -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1772 return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1777 template<
class ResourceFactory,
class ObservableFactory>
1778 static auto scope(ResourceFactory rf, ObservableFactory of)
1779 -> decltype(
rxs::scope(std::move(rf), std::move(of))) {
1780 return rxs::scope(std::move(rf), std::move(of));
1790 template<
class T,
class SourceOperator,
class OperatorFactory>
1792 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793 return source.op(std::forward<OperatorFactory>(of));
1800 template<
class T,
class SourceOperator,
class OperatorFactory>
1802 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1803 return source.op(std::forward<OperatorFactory>(of));
Definition: rx-operators.hpp:126
auto subscribe(ArgN &&...an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:669
Definition: rx-operators.hpp:360
auto take_until(AN &&...an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1400
auto pairwise(AN...an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1455
auto map(AN &&...an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:853
Definition: rx-operators.hpp:248
auto switch_if_empty(AN &&...an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable...
Definition: rx-observable.hpp:743
Definition: rx-operators.hpp:143
auto buffer(AN &&...an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:974
Definition: rx-observable.hpp:157
Definition: rx-operators.hpp:374
Definition: rx-operators.hpp:445
auto timestamp(AN &&...an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:809
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-observable.hpp:1311
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1801
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-observable.hpp:1563
Definition: rx-all.hpp:26
Definition: rx-predef.hpp:302
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-observable.hpp:1299
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1585
auto debounce(AN &&...an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:875
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:585
Definition: rx-operators.hpp:323
auto publish_synchronized(AN &&...an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1173
source_operator_type source_operator
Definition: rx-observable.hpp:519
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:431
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:157
auto flat_map(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1040
Definition: rx-operators.hpp:276
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:246
auto element_at(AN &&...an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:919
T max() const
Definition: rx-observable.hpp:431
Definition: rx-operators.hpp:283
auto observe_on(AN &&...an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1206
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1251
Definition: rx-operators.hpp:290
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
auto concat_transform(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1084
Definition: rx-operators.hpp:346
auto combine_latest(AN...an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1107
auto take_while(AN &&...an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1411
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1685
Definition: rx-operators.hpp:408
observable(const source_operator_type &o)
Definition: rx-observable.hpp:574
Definition: rx-operators.hpp:487
Definition: rx-operators.hpp:381
rxu::value_type_t< delayed_type< T, AN... >> delayed_type_t
Definition: rx-operators.hpp:60
auto skip_until(AN...an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1367
Definition: rx-operators.hpp:459
auto skip_last(AN...an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1356
Definition: rx-operators.hpp:325
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1678
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:562
Definition: rx-operators.hpp:508
auto distinct(AN &&...an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:897
Definition: rx-operators.hpp:339
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:518
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1616
auto window_with_time(AN &&...an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:941
auto skip(AN...an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1345
auto switch_on_error(AN &&...an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:842
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
~blocking_observable()
Definition: rx-observable.hpp:248
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1725
Definition: rx-operators.hpp:367
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1608
Definition: rx-operators.hpp:310
auto exists(AN &&...an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:710
Definition: rx-sources.hpp:15
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:612
Definition: rx-operators.hpp:438
Definition: rx-operators.hpp:298
Definition: rx-operators.hpp:296
auto scan(AN...an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1323
auto tap(AN &&...an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:776
auto start_with(AN &&...an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
Definition: rx-operators.hpp:227
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1275
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:404
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
Definition: rx-observable.hpp:36
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1647
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1663
~observable()
Definition: rx-observable.hpp:566
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:343
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
auto repeat(AN...an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1422
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:417
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
auto window_with_time_or_count(AN &&...an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:952
Definition: rx-operators.hpp:262
auto is_empty(AN &&...an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-observable.hpp:688
auto group_by(AN &&...an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1129
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
auto window_toggle(AN &&...an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:963
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:424
auto sample_with_time(AN &&...an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1334
Definition: rx-operators.hpp:353
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:315
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void ** >::type=0)
Definition: rx-observable.hpp:74
Definition: rx-operators.hpp:466
Definition: rx-operators.hpp:136
auto transform(AN &&...an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:864
auto replay(AN &&...an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-observable.hpp:1184
static auto start_with(Observable o, Value0 v0, ValueN...vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1740
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:297
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
Definition: rx-operators.hpp:192
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1770
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1571
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1748
auto ignore_elements(AN &&...an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1140
Definition: rx-operators.hpp:473
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto on_error_resume_next(AN &&...an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:831
auto distinct_until_changed(AN &&...an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:908
void unsubscribe() const
Definition: rx-subscription.hpp:170
auto window(AN &&...an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:930
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:301
Definition: rx-operators.hpp:220
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1287
Definition: rx-predef.hpp:270
Definition: rx-operators.hpp:185
auto buffer_with_time_or_count(AN &&...an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:996
auto sequence_equal(AN...an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:765
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1592
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1631
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1763
auto merge(AN...an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-observable.hpp:1018
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1639
auto amb(AN...an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1029
auto with_latest_from(AN...an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1095
observable_type source
Definition: rx-observable.hpp:247
T sum() const
Definition: rx-observable.hpp:389
static auto from(Coordination cn, Value0 v0, ValueN...vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype( rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1716
Definition: rx-operators.hpp:452
Definition: rx-operators.hpp:269
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto concat(AN...an) const
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-observable.hpp:1062
static auto from(Value0 v0, ValueN...vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype( rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1708
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto retry(AN...an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1433
auto switch_on_next(AN &&...an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:1007
Definition: rx-operators.hpp:480
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
Definition: rx-operators.hpp:150
double average() const
Definition: rx-observable.hpp:410
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1670
auto contains(AN &&...an) const
Returns an Observable that emits true if the source Observable emitted a specified item...
Definition: rx-observable.hpp:721
auto multicast(AN &&...an) const
Definition: rx-observable.hpp:1151
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype( rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1700
Definition: rx-operators.hpp:332
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
auto reduce(AN &&...an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1217
auto take_last(AN &&...an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1389
auto take(AN...an) const
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-observable.hpp:1378
auto start_with(AN...an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1444
blocking_observable(observable_type s)
Definition: rx-observable.hpp:251
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto all(AN &&...an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:677
auto default_if_empty(AN &&...an) const
If the source Observable terminates without emitting any items, emits a default item and completes...
Definition: rx-observable.hpp:754
auto observable_member(Tag, AN &&...an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
Definition: rx-operators.hpp:164
auto subscribe_with_rethrow(ArgN &&...an) const -> void
Definition: rx-observable.hpp:294
Definition: rx-operators.hpp:317
Definition: rx-operators.hpp:494
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1263
auto subscribe(ArgN &&...an) const -> void
Definition: rx-observable.hpp:269
static auto from() -> decltype( rxs::from< T >())
Definition: rx-observable.hpp:1693
Definition: rx-predef.hpp:128
auto time_interval(AN &&...an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:787
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1778
Definition: rx-operators.hpp:410
auto buffer_with_time(AN &&...an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:985
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
auto delay(AN &&...an) const
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-observable.hpp:886
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1624
Definition: rx-operators.hpp:300
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1578
auto accumulate(AN &&...an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1228
auto any(AN &&...an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:699
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1732
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:590
Definition: rx-scheduler.hpp:426
Definition: rx-predef.hpp:115
auto subscribe_on(AN &&...an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1195
auto timeout(AN &&...an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:798
Definition: rx-sources.hpp:17
auto zip(AN &&...an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1118
auto concat_map(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1073
auto merge_transform(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1051
observable()
Definition: rx-observable.hpp:570
Definition: rx-operators.hpp:299
static auto never() -> decltype(rxs::never< T >())
Definition: rx-observable.hpp:1600
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1239
T min() const
Definition: rx-observable.hpp:452
Definition: rx-predef.hpp:126
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
auto publish(AN &&...an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1162
auto filter(AN &&...an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:732
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1755
Definition: rx-operators.hpp:402
auto subscribe(ArgN &&...an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:604
int count() const
Definition: rx-observable.hpp:365
Definition: rx-operators.hpp:395
observable(source_operator_type &&o)
Definition: rx-observable.hpp:578
Definition: rx-operators.hpp:388
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1655
Definition: rx-operators.hpp:501