RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-all.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
21 #if !defined(RXCPP_OPERATORS_RX_ALL_HPP)
22 #define RXCPP_OPERATORS_RX_ALL_HPP
23 
24 #include "../rx-includes.hpp"
25 
26 namespace rxcpp {
27 
28 namespace operators {
29 
30 namespace detail {
31 
32 template<class... AN>
33 struct all_invalid_arguments {};
34 
35 template<class... AN>
36 struct all_invalid : public rxo::operator_base<all_invalid_arguments<AN...>> {
37  using type = observable<all_invalid_arguments<AN...>, all_invalid<AN...>>;
38 };
39 template<class... AN>
40 using all_invalid_t = typename all_invalid<AN...>::type;
41 
42 template<class T, class Predicate>
43 struct all
44 {
45  typedef rxu::decay_t<T> source_value_type;
46  typedef rxu::decay_t<Predicate> test_type;
47  test_type test;
48 
49  typedef bool value_type;
50 
51  all(test_type t)
52  : test(std::move(t))
53  {
54  }
55 
56  template<class Subscriber>
57  struct all_observer
58  {
59  typedef all_observer<Subscriber> this_type;
60  typedef source_value_type value_type;
61  typedef rxu::decay_t<Subscriber> dest_type;
62  typedef observer<value_type, this_type> observer_type;
63  dest_type dest;
64  test_type test;
65  mutable bool done;
66 
67  all_observer(dest_type d, test_type t)
68  : dest(std::move(d))
69  , test(std::move(t)),
70  done(false)
71  {
72  }
73  void on_next(source_value_type v) const {
74  auto filtered = on_exception([&]() {
75  return !this->test(v); },
76  dest);
77  if (filtered.empty()) {
78  return;
79  }
80  if (filtered.get() && !done) {
81  done = true;
82  dest.on_next(false);
83  dest.on_completed();
84  }
85  }
86  void on_error(std::exception_ptr e) const {
87  dest.on_error(e);
88  }
89  void on_completed() const {
90  if(!done) {
91  done = true;
92  dest.on_next(true);
93  dest.on_completed();
94  }
95  }
96 
97  static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
98  return make_subscriber<value_type>(d, this_type(d, std::move(t)));
99  }
100  };
101 
102  template<class Subscriber>
103  auto operator()(Subscriber dest) const
104  -> decltype(all_observer<Subscriber>::make(std::move(dest), test)) {
105  return all_observer<Subscriber>::make(std::move(dest), test);
106  }
107 };
108 
109 }
110 
113 template<class... AN>
114 auto all(AN&&... an)
115  -> operator_factory<all_tag, AN...> {
116  return operator_factory<all_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
117 }
118 
127 template<class... AN>
128 auto is_empty(AN&&... an)
130  return operator_factory<is_empty_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
131 }
132 
133 }
134 
135 template<>
137 {
138  template<class Observable, class Predicate,
139  class SourceValue = rxu::value_type_t<Observable>,
140  class Enabled = rxu::enable_if_all_true_type_t<
142  class All = rxo::detail::all<SourceValue, rxu::decay_t<Predicate>>,
143  class Value = rxu::value_type_t<All>>
144  static auto member(Observable&& o, Predicate&& p)
145  -> decltype(o.template lift<Value>(All(std::forward<Predicate>(p)))) {
146  return o.template lift<Value>(All(std::forward<Predicate>(p)));
147  }
148 
149  template<class... AN>
150  static operators::detail::all_invalid_t<AN...> member(const AN&...) {
151  std::terminate();
152  return {};
153  static_assert(sizeof...(AN) == 10000, "all takes (Predicate)");
154  }
155 };
156 
157 template<>
159 {
160  template<class Observable,
161  class SourceValue = rxu::value_type_t<Observable>,
162  class Enabled = rxu::enable_if_all_true_type_t<
163  is_observable<Observable>>,
164  class Predicate = std::function<bool(SourceValue)>,
165  class IsEmpty = rxo::detail::all<SourceValue, rxu::decay_t<Predicate>>,
166  class Value = rxu::value_type_t<IsEmpty>>
167  static auto member(Observable&& o)
168  -> decltype(o.template lift<Value>(IsEmpty(nullptr))) {
169  return o.template lift<Value>(IsEmpty([](SourceValue) { return false; }));
170  }
171 
172  template<class... AN>
173  static operators::detail::all_invalid_t<AN...> member(AN...) {
174  std::terminate();
175  return {};
176  static_assert(sizeof...(AN) == 10000, "is_empty takes no arguments");
177  }
178 };
179 
180 }
181 
182 #endif
Definition: rx-all.hpp:26
static operators::detail::all_invalid_t< AN... > member(const AN &...)
Definition: rx-all.hpp:150
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
auto is_empty(AN &&...an) -> operator_factory< is_empty_tag, AN... >
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-all.hpp:128
Definition: rx-operators.hpp:117
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static auto member(Observable &&o, Predicate &&p) -> decltype(o.template lift< Value >(All(std::forward< Predicate >(p))))
Definition: rx-all.hpp:144
static auto member(Observable &&o) -> decltype(o.template lift< Value >(IsEmpty(nullptr)))
Definition: rx-all.hpp:167
static operators::detail::all_invalid_t< AN... > member(AN...)
Definition: rx-all.hpp:173
Definition: rx-operators.hpp:110
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
Definition: rx-operators.hpp:16
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-predef.hpp:177
auto all(AN &&...an) -> operator_factory< all_tag, AN... >
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-all.hpp:114