RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-multicast.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
14 #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP)
15 #define RXCPP_OPERATORS_RX_MULTICAST_HPP
16 
17 #include "../rx-includes.hpp"
18 
19 namespace rxcpp {
20 
21 namespace operators {
22 
23 namespace detail {
24 
25 template<class... AN>
26 struct multicast_invalid_arguments {};
27 
28 template<class... AN>
29 struct multicast_invalid : public rxo::operator_base<multicast_invalid_arguments<AN...>> {
30  using type = observable<multicast_invalid_arguments<AN...>, multicast_invalid<AN...>>;
31 };
32 template<class... AN>
33 using multicast_invalid_t = typename multicast_invalid<AN...>::type;
34 
35 template<class T, class Observable, class Subject>
36 struct multicast : public operator_base<T>
37 {
38  typedef rxu::decay_t<Observable> source_type;
39  typedef rxu::decay_t<Subject> subject_type;
40 
41  struct multicast_state : public std::enable_shared_from_this<multicast_state>
42  {
43  multicast_state(source_type o, subject_type sub)
44  : source(std::move(o))
45  , subject_value(std::move(sub))
46  {
47  }
48  source_type source;
49  subject_type subject_value;
50  rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
51  };
52 
53  std::shared_ptr<multicast_state> state;
54 
55  multicast(source_type o, subject_type sub)
56  : state(std::make_shared<multicast_state>(std::move(o), std::move(sub)))
57  {
58  }
59  template<class Subscriber>
60  void on_subscribe(Subscriber&& o) const {
61  state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
62  }
63  void on_connect(composite_subscription cs) const {
64  if (state->connection.empty()) {
65  auto destination = state->subject_value.get_subscriber();
66 
67  // the lifetime of each connect is nested in the subject lifetime
68  state->connection.reset(destination.add(cs));
69 
70  auto localState = state;
71 
72  // when the connection is finished it should shutdown the connection
73  cs.add(
74  [destination, localState](){
75  if (!localState->connection.empty()) {
76  destination.remove(localState->connection.get());
77  localState->connection.reset();
78  }
79  });
80 
81  // use cs not destination for lifetime of subscribe.
82  state->source.subscribe(cs, destination);
83  }
84  }
85 };
86 
87 }
88 
91 template<class... AN>
92 auto multicast(AN&&... an)
94  return operator_factory<multicast_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
95 }
96 
97 }
98 
99 template<>
101 {
102  template<class Observable, class Subject,
103  class Enabled = rxu::enable_if_all_true_type_t<
105  class SourceValue = rxu::value_type_t<Observable>,
106  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Subject>>,
107  class Value = rxu::value_type_t<Multicast>,
109  static Result member(Observable&& o, Subject&& sub) {
110  return Result(Multicast(std::forward<Observable>(o), std::forward<Subject>(sub)));
111  }
112 
113  template<class... AN>
114  static operators::detail::multicast_invalid_t<AN...> member(AN...) {
115  std::terminate();
116  return {};
117  static_assert(sizeof...(AN) == 10000, "multicast takes (Subject)");
118  }
119 };
120 
121 }
122 
123 #endif
a source of values that is shared across all subscribers and does not start until connectable_observa...
Definition: rx-connectable_observable.hpp:105
Definition: rx-all.hpp:26
static Result member(Observable &&o, Subject &&sub)
Definition: rx-multicast.hpp:109
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static operators::detail::multicast_invalid_t< AN... > member(AN...)
Definition: rx-multicast.hpp:114
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
Definition: rx-operators.hpp:262
auto multicast(AN &&...an) -> operator_factory< multicast_tag, AN... >
allows connections to the source to be independent of subscriptions.
Definition: rx-multicast.hpp:92
Definition: rx-predef.hpp:177