RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Classes | Namespaces | Macros | Functions
rx-merge.hpp File Reference

For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. More...

#include "../rx-includes.hpp"
Include dependency graph for rx-merge.hpp:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  rxcpp::member_overload< merge_tag >
 

Namespaces

 rxcpp
 
 rxcpp::operators
 

Macros

#define RXCPP_OPERATORS_RX_MERGE_HPP
 

Functions

template<class... AN>
auto rxcpp::operators::merge (AN &&...an) -> operator_factory< merge_tag, AN... >
 For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. More...
 

Detailed Description

For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.

There are 2 variants of the operator:

Template Parameters
Coordinationthe type of the scheduler (optional).
Value0... (optional).
ValueNtypes of source observables (optional).
Parameters
cnthe scheduler to synchronize sources from different contexts (optional).
v0... (optional).
vnsource observables (optional).
Returns
Observable that emits items that are the result of flattening the observables emitted by the source observable.

If scheduler is omitted, identity_current_thread is used.

Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.merge(rxcpp::observe_on_new_thread());
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Timer1 fired
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Timer2 fired
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Timer3 fired
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3);
auto values = base.merge();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnNext: 2
OnNext: 1
OnCompleted
Sample Code
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
auto values = o1.merge(o2, o3);
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnNext: 2
OnNext: 1
OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
printf("[thread %s] Timer1 fired\n", get_pid().c_str());
return 1;
});
auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
printf("[thread %s] Timer2 fired\n", get_pid().c_str());
return 2;
});
auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
printf("[thread %s] Timer3 fired\n", get_pid().c_str());
return 3;
});
auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3);
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056] Timer1 fired
[thread 47481303181056] OnNext: 1
[thread 47481303181056] Timer2 fired
[thread 47481303181056] OnNext: 2
[thread 47481303181056] Timer3 fired
[thread 47481303181056] OnNext: 3
[thread 47481303181056] OnCompleted
[thread 47481267428736] Finish task

Macro Definition Documentation

#define RXCPP_OPERATORS_RX_MERGE_HPP