RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Classes | Namespaces | Macros | Functions
rx-amb.hpp File Reference

For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. More...

#include "../rx-includes.hpp"
Include dependency graph for rx-amb.hpp:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  rxcpp::member_overload< amb_tag >
 

Namespaces

 rxcpp
 
 rxcpp::operators
 

Macros

#define RXCPP_OPERATORS_RX_AMB_HPP
 

Functions

template<class... AN>
auto rxcpp::operators::amb (AN &&...an) -> operator_factory< amb_tag, AN... >
 For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. More...
 

Detailed Description

For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.

There are 2 variants of the operator:

Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.

If scheduler is omitted, identity_current_thread is used.

Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.amb(rxcpp::observe_on_new_thread());
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481294776064] Timer3 fired
[thread 47481294776064] OnNext: 3
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.amb();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto values = o1.amb(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481294776064] Timer3 fired
[thread 47481294776064] OnNext: 3
[thread 47481294776064] OnCompleted
[thread 47481267428736] Finish task

Macro Definition Documentation

#define RXCPP_OPERATORS_RX_AMB_HPP