RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-filter.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
20 #if !defined(RXCPP_OPERATORS_RX_FILTER_HPP)
21 #define RXCPP_OPERATORS_RX_FILTER_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct filter_invalid_arguments {};
33 
34 template<class... AN>
35 struct filter_invalid : public rxo::operator_base<filter_invalid_arguments<AN...>> {
36  using type = observable<filter_invalid_arguments<AN...>, filter_invalid<AN...>>;
37 };
38 template<class... AN>
39 using filter_invalid_t = typename filter_invalid<AN...>::type;
40 
41 template<class T, class Predicate>
42 struct filter
43 {
44  typedef rxu::decay_t<T> source_value_type;
45  typedef rxu::decay_t<Predicate> test_type;
46  test_type test;
47 
48  filter(test_type t)
49  : test(std::move(t))
50  {
51  }
52 
53  template<class Subscriber>
54  struct filter_observer
55  {
56  typedef filter_observer<Subscriber> this_type;
57  typedef source_value_type value_type;
58  typedef rxu::decay_t<Subscriber> dest_type;
59  typedef observer<value_type, this_type> observer_type;
60  dest_type dest;
61  mutable test_type test;
62 
63  filter_observer(dest_type d, test_type t)
64  : dest(std::move(d))
65  , test(std::move(t))
66  {
67  }
68 
69  template <class Value>
70  void on_next(Value&& v) const {
71  auto filtered = on_exception([&](){
72  return !this->test(rxu::as_const(v));
73  },
74  dest);
75  if (filtered.empty()) {
76  return;
77  }
78  if (!filtered.get()) {
79  dest.on_next(std::forward<Value>(v));
80  }
81  }
82  void on_error(std::exception_ptr e) const {
83  dest.on_error(e);
84  }
85  void on_completed() const {
86  dest.on_completed();
87  }
88 
89  static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
90  return make_subscriber<value_type>(d, this_type(d, std::move(t)));
91  }
92  };
93 
94  template<class Subscriber>
95  auto operator()(Subscriber dest) const
96  -> decltype(filter_observer<Subscriber>::make(std::move(dest), test)) {
97  return filter_observer<Subscriber>::make(std::move(dest), test);
98  }
99 };
100 
101 }
102 
105 template<class... AN>
106 auto filter(AN&&... an)
107  -> operator_factory<filter_tag, AN...> {
108  return operator_factory<filter_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
109 }
110 
111 }
112 
113 template<>
115 {
116  template<class Observable, class Predicate,
117  class SourceValue = rxu::value_type_t<Observable>,
118  class Filter = rxo::detail::filter<SourceValue, rxu::decay_t<Predicate>>>
119  static auto member(Observable&& o, Predicate&& p)
120  -> decltype(o.template lift<SourceValue>(Filter(std::forward<Predicate>(p)))) {
121  return o.template lift<SourceValue>(Filter(std::forward<Predicate>(p)));
122  }
123 
124  template<class... AN>
125  static operators::detail::filter_invalid_t<AN...> member(const AN&...) {
126  std::terminate();
127  return {};
128  static_assert(sizeof...(AN) == 10000, "filter takes (Predicate)");
129  }
130 };
131 
132 }
133 
134 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto filter(AN &&...an) -> operator_factory< filter_tag, AN... >
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-filter.hpp:106
static auto member(Observable &&o, Predicate &&p) -> decltype(o.template lift< SourceValue >(Filter(std::forward< Predicate >(p))))
Definition: rx-filter.hpp:119
auto AN
Definition: rx-finally.hpp:105
Definition: rx-operators.hpp:47
Definition: rx-operators.hpp:213
T const & as_const(T &t)
Definition: rx-util.hpp:57
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static operators::detail::filter_invalid_t< AN... > member(const AN &...)
Definition: rx-filter.hpp:125