RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-tap.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
26 #if !defined(RXCPP_OPERATORS_RX_TAP_HPP)
27 #define RXCPP_OPERATORS_RX_TAP_HPP
28 
29 #include "../rx-includes.hpp"
30 
31 namespace rxcpp {
32 
33 namespace operators {
34 
35 namespace detail {
36 
37 template<class... AN>
38 struct tap_invalid_arguments {};
39 
40 template<class... AN>
41 struct tap_invalid : public rxo::operator_base<tap_invalid_arguments<AN...>> {
42  using type = observable<tap_invalid_arguments<AN...>, tap_invalid<AN...>>;
43 };
44 template<class... AN>
45 using tap_invalid_t = typename tap_invalid<AN...>::type;
46 
47 template<class T, class MakeObserverArgN>
48 struct tap_observer_factory;
49 
50 template<class T, class... ArgN>
51 struct tap_observer_factory<T, std::tuple<ArgN...>>
52 {
53  using source_value_type = rxu::decay_t<T>;
54  using out_type = decltype(make_observer<source_value_type, rxcpp::detail::OnErrorIgnore>(*((ArgN*)nullptr)...));
55  auto operator()(ArgN&&... an) -> out_type const {
56  return make_observer<source_value_type, rxcpp::detail::OnErrorIgnore>(std::forward<ArgN>(an)...);
57  }
58 };
59 
60 template<class T, class MakeObserverArgN, class Factory = tap_observer_factory<T, MakeObserverArgN>>
61 struct tap
62 {
63  using source_value_type = rxu::decay_t<T>;
64  using args_type = rxu::decay_t<MakeObserverArgN>;
65  using factory_type = Factory;
66  using out_type = typename factory_type::out_type;
67  out_type out;
68 
69  tap(args_type a)
70  : out(rxu::apply(std::move(a), factory_type()))
71  {
72  }
73 
74  template<class Subscriber>
75  struct tap_observer
76  {
77  using this_type = tap_observer<Subscriber>;
78  using value_type = source_value_type;
79  using dest_type = rxu::decay_t<Subscriber>;
80  using factory_type = Factory;
81  using out_type = typename factory_type::out_type;
82  using observer_type = observer<value_type, this_type>;
83  dest_type dest;
84  out_type out;
85 
86  tap_observer(dest_type d, out_type o)
87  : dest(std::move(d))
88  , out(std::move(o))
89  {
90  }
91  void on_next(source_value_type v) const {
92  out.on_next(v);
93  dest.on_next(v);
94  }
95  void on_error(std::exception_ptr e) const {
96  out.on_error(e);
97  dest.on_error(e);
98  }
99  void on_completed() const {
100  out.on_completed();
101  dest.on_completed();
102  }
103 
104  static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, out_type o) {
105  return make_subscriber<value_type>(d, this_type(d, std::move(o)));
106  }
107  };
108 
109  template<class Subscriber>
110  auto operator()(Subscriber dest) const
111  -> decltype(tap_observer<Subscriber>::make(std::move(dest), out)) {
112  return tap_observer<Subscriber>::make(std::move(dest), out);
113  }
114 };
115 
116 }
117 
120 template<class... AN>
121 auto tap(AN&&... an)
122  -> operator_factory<tap_tag, AN...> {
123  return operator_factory<tap_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
124 }
125 
126 }
127 
128 template<>
130 {
131  template<class Observable, class... MakeObserverArgN,
132  class Enabled = rxu::enable_if_all_true_type_t<
134  class SourceValue = rxu::value_type_t<Observable>,
135  class Tap = rxo::detail::tap<SourceValue, std::tuple<rxu::decay_t<MakeObserverArgN>...>>>
136  static auto member(Observable&& o, MakeObserverArgN&&... an)
137  -> decltype(o.template lift<SourceValue>(Tap(std::make_tuple(std::forward<MakeObserverArgN>(an)...)))) {
138  return o.template lift<SourceValue>(Tap(std::make_tuple(std::forward<MakeObserverArgN>(an)...)));
139  }
140 
141  template<class... AN>
142  static operators::detail::tap_invalid_t<AN...> member(const AN&...) {
143  std::terminate();
144  return {};
145  static_assert(sizeof...(AN) == 10000, "tap takes (MakeObserverArgN...)");
146  }
147 };
148 
149 }
150 
151 #endif
Definition: rx-operators.hpp:445
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static operators::detail::tap_invalid_t< AN... > member(const AN &...)
Definition: rx-tap.hpp:142
static auto member(Observable &&o, MakeObserverArgN &&...an) -> decltype(o.template lift< SourceValue >(Tap(std::make_tuple(std::forward< MakeObserverArgN >(an)...))))
Definition: rx-tap.hpp:136
auto tap(AN &&...an) -> operator_factory< tap_tag, AN... >
inspect calls to on_next, on_error and on_completed.
Definition: rx-tap.hpp:121
Definition: rx-predef.hpp:177