14 #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP) 15 #define RXCPP_OPERATORS_RX_MULTICAST_HPP 17 #include "../rx-includes.hpp" 26 struct multicast_invalid_arguments {};
29 struct multicast_invalid :
public rxo::operator_base<multicast_invalid_arguments<AN...>> {
30 using type = observable<multicast_invalid_arguments<
AN...>, multicast_invalid<
AN...>>;
33 using multicast_invalid_t =
typename multicast_invalid<
AN...>::type;
35 template<
class T,
class Observable,
class Subject>
36 struct multicast :
public operator_base<T>
38 typedef rxu::decay_t<Observable> source_type;
39 typedef rxu::decay_t<Subject> subject_type;
41 struct multicast_state :
public std::enable_shared_from_this<multicast_state>
43 multicast_state(source_type o, subject_type sub)
44 : source(std::move(o))
45 , subject_value(std::move(sub))
49 subject_type subject_value;
50 rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
53 std::shared_ptr<multicast_state> state;
55 multicast(source_type o, subject_type sub)
56 : state(std::make_shared<multicast_state>(std::move(o), std::move(sub)))
59 template<
class Subscriber>
60 void on_subscribe(Subscriber&& o)
const {
61 state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
63 void on_connect(composite_subscription cs)
const {
64 if (state->connection.empty()) {
65 auto destination = state->subject_value.get_subscriber();
68 state->connection.reset(destination.add(cs));
70 auto localState = state;
74 [destination, localState](){
75 if (!localState->connection.empty()) {
76 destination.remove(localState->connection.get());
77 localState->connection.reset();
82 state->source.subscribe(cs, destination);
94 return operator_factory<multicast_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
102 template<
class Observable,
class Subject,
106 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>,
rxu::decay_t<Subject>>,
109 static Result
member(Observable&& o, Subject&& sub) {
110 return Result(Multicast(std::forward<Observable>(o), std::forward<Subject>(sub)));
113 template<
class...
AN>
114 static operators::detail::multicast_invalid_t<
AN...>
member(
AN...) {
117 static_assert(
sizeof...(
AN) == 10000,
"multicast takes (Subject)");
a source of values that is shared across all subscribers and does not start until connectable_observa...
Definition: rx-connectable_observable.hpp:105
Definition: rx-all.hpp:26
static Result member(Observable &&o, Subject &&sub)
Definition: rx-multicast.hpp:109
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static operators::detail::multicast_invalid_t< AN... > member(AN...)
Definition: rx-multicast.hpp:114
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
Definition: rx-operators.hpp:262
auto multicast(AN &&...an) -> operator_factory< multicast_tag, AN... >
allows connections to the source to be independent of subscriptions.
Definition: rx-multicast.hpp:92
Definition: rx-predef.hpp:177