RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Namespaces | Macros | Functions
rx-create.hpp File Reference

Returns an observable that executes the specified function when a subscriber subscribes to it. More...

#include "../rx-includes.hpp"
Include dependency graph for rx-create.hpp:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Namespaces

 rxcpp
 
 rxcpp::sources
 

Macros

#define RXCPP_SOURCES_RX_CREATE_HPP
 

Functions

template<class T , class OnSubscribe >
auto rxcpp::sources::create (OnSubscribe os) -> observable< T, detail::create< T, OnSubscribe >>
 Returns an observable that executes the specified function when a subscriber subscribes to it. More...
 

Detailed Description

Returns an observable that executes the specified function when a subscriber subscribes to it.

Template Parameters
Tthe type of the items that this observable emits
OnSubscribethe type of OnSubscribe handler function
Parameters
osOnSubscribe event handler
Returns
Observable that executes the specified function when a Subscriber subscribes to it.
Sample Code
auto ints = rxcpp::observable<>::create<int>(
s.on_next(1);
s.on_next(2);
s.on_completed();
});
ints.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnCompleted
Warning
It is good practice to check the observer's is_subscribed state from within the function you pass to create so that your observable can stop emitting items or doing expensive calculations when there is no longer an interested observer.
Bad Code
auto subscription = rxcpp::composite_subscription();
auto subscriber = rxcpp::make_subscriber<int>(
subscription,
[&](int v){
printf("OnNext: %d\n", v);
if (v == 2)
subscription.unsubscribe();
},
[](){
printf("OnCompleted\n");
});
for (int i = 0; i < 5; ++i) {
s.on_next(i);
printf("Just sent: OnNext(%d)\n", i);
}
s.on_completed();
printf("Just sent: OnCompleted()\n");
}).subscribe(subscriber);
OnNext: 0
Just sent: OnNext(0)
OnNext: 1
Just sent: OnNext(1)
OnNext: 2
Just sent: OnNext(2)
Just sent: OnNext(3)
Just sent: OnNext(4)
Just sent: OnCompleted()
Good Code
auto subscription = rxcpp::composite_subscription();
auto subscriber = rxcpp::make_subscriber<int>(
subscription,
[&](int v){
printf("OnNext: %d\n", v);
if (v == 2)
subscription.unsubscribe();
},
[](){
printf("OnCompleted\n");
});
for (int i = 0; i < 5; ++i) {
if (!s.is_subscribed()) // Stop emitting if nobody is listening
break;
s.on_next(i);
printf("Just sent: OnNext(%d)\n", i);
}
s.on_completed();
printf("Just sent: OnCompleted()\n");
}).subscribe(subscriber);
OnNext: 0
Just sent: OnNext(0)
OnNext: 1
Just sent: OnNext(1)
OnNext: 2
Just sent: OnNext(2)
Just sent: OnCompleted()
Warning
It is good practice to use operators like observable::take to control lifetime rather than use the subscription explicitly.
Good Code
auto ints = rxcpp::observable<>::create<int>(
for (int i = 0; i < 5; ++i) {
if (!s.is_subscribed()) // Stop emitting if nobody is listening
break;
s.on_next(i);
printf("Just sent: OnNext(%d)\n", i);
}
s.on_completed();
printf("Just sent: OnCompleted()\n");
});
ints.
take(2).
[](int v){
printf("OnNext: %d\n", v);
},
[](std::exception_ptr ep){
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){
printf("OnCompleted\n");
});
OnNext: 0
Just sent: OnNext(0)
OnNext: 1
OnCompleted
Just sent: OnNext(1)
Just sent: OnCompleted()

Macro Definition Documentation

#define RXCPP_SOURCES_RX_CREATE_HPP