RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-distinct.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
18 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_HPP)
19 #define RXCPP_OPERATORS_RX_DISTINCT_HPP
20 
21 #include "../rx-includes.hpp"
22 
23 namespace rxcpp {
24 
25 namespace operators {
26 
27 namespace detail {
28 
29 template<class... AN>
30 struct distinct_invalid_arguments {};
31 
32 template<class... AN>
33 struct distinct_invalid : public rxo::operator_base<distinct_invalid_arguments<AN...>> {
34  using type = observable<distinct_invalid_arguments<AN...>, distinct_invalid<AN...>>;
35 };
36 template<class... AN>
37 using distinct_invalid_t = typename distinct_invalid<AN...>::type;
38 
39 template<class T>
40 struct distinct
41 {
42  typedef rxu::decay_t<T> source_value_type;
43 
44  template<class Subscriber>
45  struct distinct_observer
46  {
47  typedef distinct_observer<Subscriber> this_type;
48  typedef source_value_type value_type;
49  typedef rxu::decay_t<Subscriber> dest_type;
50  typedef observer<value_type, this_type> observer_type;
51  dest_type dest;
52  mutable std::unordered_set<source_value_type, rxcpp::filtered_hash<source_value_type>> remembered;
53 
54  distinct_observer(dest_type d)
55  : dest(d)
56  {
57  }
58  void on_next(source_value_type v) const {
59  if (remembered.empty() || remembered.count(v) == 0) {
60  remembered.insert(v);
61  dest.on_next(v);
62  }
63  }
64  void on_error(std::exception_ptr e) const {
65  dest.on_error(e);
66  }
67  void on_completed() const {
68  dest.on_completed();
69  }
70 
71  static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) {
72  return make_subscriber<value_type>(d, this_type(d));
73  }
74  };
75 
76  template<class Subscriber>
77  auto operator()(Subscriber dest) const
78  -> decltype(distinct_observer<Subscriber>::make(std::move(dest))) {
79  return distinct_observer<Subscriber>::make(std::move(dest));
80  }
81 };
82 
83 }
84 
87 template<class... AN>
88 auto distinct(AN&&... an)
90  return operator_factory<distinct_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
91 }
92 
93 }
94 
95 template<>
97 {
98  template<class Observable,
99  class SourceValue = rxu::value_type_t<Observable>,
100  class Enabled = rxu::enable_if_all_true_type_t<
103  class Distinct = rxo::detail::distinct<SourceValue>>
104  static auto member(Observable&& o)
105  -> decltype(o.template lift<SourceValue>(Distinct())) {
106  return o.template lift<SourceValue>(Distinct());
107  }
108 
109  template<class... AN>
110  static operators::detail::distinct_invalid_t<AN...> member(AN...) {
111  std::terminate();
112  return {};
113  static_assert(sizeof...(AN) == 10000, "distinct takes no arguments");
114  }
115 };
116 
117 }
118 
119 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
Definition: rx-util.hpp:859
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto distinct(AN &&...an) -> operator_factory< distinct_tag, AN... >
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-distinct.hpp:88
Definition: rx-operators.hpp:192
static auto member(Observable &&o) -> decltype(o.template lift< SourceValue >(Distinct()))
Definition: rx-distinct.hpp:104
static operators::detail::distinct_invalid_t< AN... > member(AN...)
Definition: rx-distinct.hpp:110
Definition: rx-predef.hpp:177