RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-connect_forever.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
13 #if !defined(RXCPP_OPERATORS_RX_CONNECT_FOREVER_HPP)
14 #define RXCPP_OPERATORS_RX_CONNECT_FOREVER_HPP
15 
16 #include "../rx-includes.hpp"
17 
18 namespace rxcpp {
19 
20 namespace operators {
21 
22 namespace detail {
23 
24 template<class... AN>
25 struct connect_forever_invalid_arguments {};
26 
27 template<class... AN>
28 struct connect_forever_invalid : public rxo::operator_base<connect_forever_invalid_arguments<AN...>> {
29  using type = observable<connect_forever_invalid_arguments<AN...>, connect_forever_invalid<AN...>>;
30 };
31 template<class... AN>
32 using connect_forever_invalid_t = typename connect_forever_invalid<AN...>::type;
33 
34 template<class T, class ConnectableObservable>
35 struct connect_forever : public operator_base<T>
36 {
37  typedef rxu::decay_t<ConnectableObservable> source_type;
38 
39  source_type source;
40 
41  explicit connect_forever(source_type o)
42  : source(std::move(o))
43  {
44  source.connect();
45  }
46 
47  template<class Subscriber>
48  void on_subscribe(Subscriber&& o) const {
49  source.subscribe(std::forward<Subscriber>(o));
50  }
51 };
52 
53 }
54 
57 template<class... AN>
58 auto connect_forever(AN&&... an)
60  return operator_factory<connect_forever_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
61 }
62 
63 }
64 
65 template<>
67 {
68  template<class ConnectableObservable,
69  class Enabled = rxu::enable_if_all_true_type_t<
71  class SourceValue = rxu::value_type_t<ConnectableObservable>,
72  class ConnectForever = rxo::detail::connect_forever<SourceValue, rxu::decay_t<ConnectableObservable>>,
75  >
76  static Result member(ConnectableObservable&& o) {
77  return Result(ConnectForever(std::forward<ConnectableObservable>(o)));
78  }
79 
80  template<class... AN>
81  static operators::detail::connect_forever_invalid_t<AN...> member(AN...) {
82  std::terminate();
83  return {};
84  static_assert(sizeof...(AN) == 10000, "connect_forever takes no arguments");
85  }
86 };
87 
88 }
89 
90 #endif
Definition: rx-operators.hpp:171
Definition: rx-all.hpp:26
auto connect_forever(AN &&...an) -> operator_factory< connect_forever_tag, AN... >
takes a connectable_observable source and calls connect during the construction of the expression...
Definition: rx-connect_forever.hpp:58
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-predef.hpp:222
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static operators::detail::connect_forever_invalid_t< AN... > member(AN...)
Definition: rx-connect_forever.hpp:81
static Result member(ConnectableObservable &&o)
Definition: rx-connect_forever.hpp:76