RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Classes | Namespaces | Macros | Functions
rx-replay.hpp File Reference

1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. More...

#include "../rx-includes.hpp"
#include "./rx-multicast.hpp"
Include dependency graph for rx-replay.hpp:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  rxcpp::member_overload< replay_tag >
 

Namespaces

 rxcpp
 
 rxcpp::operators
 

Macros

#define RXCPP_OPERATORS_RX_REPLAY_HPP
 

Functions

template<class... AN>
auto rxcpp::operators::replay (AN &&...an) -> operator_factory< replay_tag, AN... >
 1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. More...
 

Detailed Description

1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.

2) replay(Count, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.

3) replay(Duration, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.

4) replay(Count, Duration, optional Coordination, optional CompositeSubscription) Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.

Template Parameters
Durationthe type of the time interval (optional).
Countthe type of the maximum number of the most recent items sent to new observers (optional).
Coordinationthe type of the scheduler (optional).
Parameters
countthe maximum number of the most recent items sent to new observers (optional).
dthe duration of the window in which the replayed items must be emitted
cna scheduler all values are queued and delivered on (optional).
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer.
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay();
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[2] OnNext: 1
[2] OnNext: 2
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481303181056][1] OnNext: 1
[thread 47481303181056][1] OnNext: 2
[thread 47481303181056][1] OnNext: 3
[thread 47481303181056][2] OnNext: 1
[thread 47481303181056][2] OnNext: 2
[thread 47481303181056][2] OnNext: 3
[thread 47481303181056][1] OnNext: 4
[thread 47481303181056][2] OnNext: 4
[thread 47481303181056][1] OnNext: 5
[thread 47481303181056][2] OnNext: 5
[thread 47481303181056][1] OnCompleted
[thread 47481303181056][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(2);
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[2] OnNext: 2
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(2, coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481305282304][1] OnNext: 1
[thread 47481305282304][1] OnNext: 2
[thread 47481305282304][1] OnNext: 3
[thread 47481305282304][2] OnNext: 2
[thread 47481305282304][2] OnNext: 3
[thread 47481305282304][1] OnNext: 4
[thread 47481305282304][2] OnNext: 4
[thread 47481305282304][1] OnNext: 5
[thread 47481305282304][2] OnNext: 5
[thread 47481305282304][1] OnCompleted
[thread 47481305282304][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(std::chrono::milliseconds(125));
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[1] OnNext: 4
[2] OnNext: 2
[2] OnNext: 3
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(std::chrono::milliseconds(125), coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481307383552][1] OnNext: 1
[thread 47481307383552][1] OnNext: 2
[thread 47481307383552][1] OnNext: 3
[thread 47481307383552][1] OnNext: 4
[thread 47481307383552][2] OnNext: 2
[thread 47481307383552][2] OnNext: 3
[thread 47481307383552][2] OnNext: 4
[thread 47481307383552][1] OnNext: 5
[thread 47481307383552][2] OnNext: 5
[thread 47481307383552][1] OnCompleted
[thread 47481307383552][2] OnCompleted
[thread 47481267428736] Finish task
Sample Code
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
replay(2, std::chrono::milliseconds(125));
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
});
[1] OnNext: 1
[1] OnNext: 2
[1] OnNext: 3
[1] OnNext: 4
[2] OnNext: 3
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
Sample Code
printf("[thread %s] Start task\n", get_pid().c_str());
auto coordination = rxcpp::serialize_new_thread();
auto worker = coordination.create_coordinator().get_worker();
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(2, std::chrono::milliseconds(125), coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
values.subscribe(
[](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
[](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 47481267428736] Start task
[thread 47481309484800][1] OnNext: 1
[thread 47481309484800][1] OnNext: 2
[thread 47481309484800][1] OnNext: 3
[thread 47481309484800][1] OnNext: 4
[thread 47481309484800][2] OnNext: 3
[thread 47481309484800][2] OnNext: 4
[thread 47481309484800][1] OnNext: 5
[thread 47481309484800][2] OnNext: 5
[thread 47481309484800][1] OnCompleted
[thread 47481309484800][2] OnCompleted
[thread 47481267428736] Finish task

Macro Definition Documentation

#define RXCPP_OPERATORS_RX_REPLAY_HPP