RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Public Types | Public Member Functions | Public Attributes | List of all members
rxcpp::blocking_observable< T, Observable > Class Template Reference

a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value. More...

#include <rx-observable.hpp>

Public Types

typedef rxu::decay_t< Observable > observable_type
 

Public Member Functions

 ~blocking_observable ()
 
 blocking_observable (observable_type s)
 
template<class... ArgN>
auto subscribe (ArgN &&...an) const -> void
 
template<class... ArgN>
auto subscribe_with_rethrow (ArgN &&...an) const -> void
 
template<class... AN>
auto first (AN **...) -> delayed_type_t< T, AN... > const
 
template<class... AN>
auto last (AN **...) -> delayed_type_t< T, AN... > const
 
int count () const
 
sum () const
 
double average () const
 
max () const
 
min () const
 

Public Attributes

observable_type source
 

Detailed Description

template<class T, class Observable>
class rxcpp::blocking_observable< T, Observable >

a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value.

Member Typedef Documentation

template<class T, class Observable>
typedef rxu::decay_t<Observable> rxcpp::blocking_observable< T, Observable >::observable_type

Constructor & Destructor Documentation

template<class T, class Observable>
rxcpp::blocking_observable< T, Observable >::~blocking_observable ( )
inline
template<class T, class Observable>
rxcpp::blocking_observable< T, Observable >::blocking_observable ( observable_type  s)
inline

Member Function Documentation

template<class T, class Observable>
double rxcpp::blocking_observable< T, Observable >::average ( ) const
inline

Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.

Returns
The average value of all items emitted by this blocking_observable.
Sample Code
When the source observable emits at least one item:
auto values = rxcpp::observable<>::range(1, 4).as_blocking();
auto average = values.average();
printf("average = %lf\n", average);
average = 2.500000
When the source observable is empty:
auto values = rxcpp::observable<>::empty<int>().as_blocking();
try {
auto average = values.average();
printf("average = %lf\n", average);
} catch (const rxcpp::empty_error& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: average() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
try {
auto average = values.average();
printf("average = %lf\n", average);
} catch (const std::exception& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: Error from source
template<class T, class Observable>
int rxcpp::blocking_observable< T, Observable >::count ( ) const
inline

Return the total number of items emitted by this blocking_observable.

Returns
The total number of items emitted by this blocking_observable.
Sample Code
auto values = rxcpp::observable<>::range(1, 3).as_blocking();
auto count = values.count();
printf("count = %d\n", count);
count = 3
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
try {
auto count = values.count();
printf("count = %d\n", count);
} catch (const std::exception& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: Error from source
template<class T, class Observable>
template<class... AN>
auto rxcpp::blocking_observable< T, Observable >::first ( AN **  ...) -> delayed_type_t<T, AN...> const
inline

Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.

Returns
The first item emitted by this blocking_observable.
Note
If the source observable calls on_error, the raised exception is rethrown by this method.
Sample Code
When the source observable emits at least one item:
auto values = rxcpp::observable<>::range(1, 3).as_blocking();
auto first = values.first();
printf("first = %d\n", first);
first = 1
When the source observable is empty:
auto values = rxcpp::observable<>::empty<int>().as_blocking();
try {
auto first = values.first();
printf("first = %d\n", first);
} catch (const rxcpp::empty_error& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: first() requires a stream with at least one value
template<class T, class Observable>
template<class... AN>
auto rxcpp::blocking_observable< T, Observable >::last ( AN **  ...) -> delayed_type_t<T, AN...> const
inline

Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.

Returns
The last item emitted by this blocking_observable.
Note
If the source observable calls on_error, the raised exception is rethrown by this method.
Sample Code
When the source observable emits at least one item:
auto values = rxcpp::observable<>::range(1, 3).as_blocking();
auto last = values.last();
printf("last = %d\n", last);
last = 3
When the source observable is empty:
auto values = rxcpp::observable<>::empty<int>().as_blocking();
try {
auto last = values.last();
printf("last = %d\n", last);
} catch (const rxcpp::empty_error& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: last() requires a stream with at least one value
template<class T, class Observable>
T rxcpp::blocking_observable< T, Observable >::max ( ) const
inline

Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.

Returns
The max of all items emitted by this blocking_observable.
Sample Code
When the source observable emits at least one item:
auto values = rxcpp::observable<>::range(1, 4).as_blocking();
auto max = values.max();
printf("max = %d\n", max);
max = 4
When the source observable is empty:
auto values = rxcpp::observable<>::empty<int>().as_blocking();
try {
auto max = values.max();
printf("max = %d\n", max);
} catch (const rxcpp::empty_error& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: max() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
try {
auto max = values.max();
printf("max = %d\n", max);
} catch (const std::exception& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: Error from source
template<class T, class Observable>
T rxcpp::blocking_observable< T, Observable >::min ( ) const
inline

Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.

Returns
The min of all items emitted by this blocking_observable.
Sample Code
When the source observable emits at least one item:
auto values = rxcpp::observable<>::range(1, 4).as_blocking();
auto min = values.min();
printf("min = %d\n", min);
min = 1
When the source observable is empty:
auto values = rxcpp::observable<>::empty<int>().as_blocking();
try {
auto min = values.min();
printf("min = %d\n", min);
} catch (const rxcpp::empty_error& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: min() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
try {
auto min = values.min();
printf("min = %d\n", min);
} catch (const std::exception& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: Error from source
template<class T, class Observable>
template<class... ArgN>
auto rxcpp::blocking_observable< T, Observable >::subscribe ( ArgN &&...  an) const -> void
inline

subscribe will cause this observable to emit values to the provided subscriber.

Returns
void
Parameters
an...- the arguments are passed to make_subscriber().

callers must provide enough arguments to make a subscriber. overrides are supported. thus subscribe(thesubscriber, composite_subscription()) will take thesubscriber.get_observer() and the provided subscription and subscribe to the new subscriber. the on_next, on_error, on_completed methods can be supplied instead of an observer if a subscription or subscriber is not provided then a new subscription will be created.

template<class T, class Observable>
template<class... ArgN>
auto rxcpp::blocking_observable< T, Observable >::subscribe_with_rethrow ( ArgN &&...  an) const -> void
inline

subscribe_with_rethrow will cause this observable to emit values to the provided subscriber.

Note
If the source observable calls on_error, the raised exception is rethrown by this method.
If the source observable calls on_error, the on_error method on the subscriber will not be called.
Returns
void
Parameters
an...- the arguments are passed to make_subscriber().

callers must provide enough arguments to make a subscriber. overrides are supported. thus subscribe(thesubscriber, composite_subscription()) will take thesubscriber.get_observer() and the provided subscription and subscribe to the new subscriber. the on_next, on_error, on_completed methods can be supplied instead of an observer if a subscription or subscriber is not provided then a new subscription will be created.

template<class T, class Observable>
T rxcpp::blocking_observable< T, Observable >::sum ( ) const
inline

Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.

Returns
The sum of all items emitted by this blocking_observable.
Sample Code
When the source observable emits at least one item:
auto values = rxcpp::observable<>::range(1, 3).as_blocking();
auto sum = values.sum();
printf("sum = %d\n", sum);
sum = 6
When the source observable is empty:
auto values = rxcpp::observable<>::empty<int>().as_blocking();
try {
auto sum = values.sum();
printf("sum = %d\n", sum);
} catch (const rxcpp::empty_error& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: sum() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
try {
auto sum = values.sum();
printf("sum = %d\n", sum);
} catch (const std::exception& ex) {
printf("Exception: %s\n", ex.what());
}
Exception: Error from source

Member Data Documentation

template<class T, class Observable>
observable_type rxcpp::blocking_observable< T, Observable >::source

The documentation for this class was generated from the following file: