RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-element_at.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
18 #if !defined(RXCPP_OPERATORS_RX_ELEMENT_AT_HPP)
19 #define RXCPP_OPERATORS_RX_ELEMENT_AT_HPP
20 
21 #include "../rx-includes.hpp"
22 
23 namespace rxcpp {
24 
25 namespace operators {
26 
27 namespace detail {
28 
29 template<class... AN>
30 struct element_at_invalid_arguments {};
31 
32 template<class... AN>
33 struct element_at_invalid : public rxo::operator_base<element_at_invalid_arguments<AN...>> {
34  using type = observable<element_at_invalid_arguments<AN...>, element_at_invalid<AN...>>;
35 };
36 template<class... AN>
37 using element_at_invalid_t = typename element_at_invalid<AN...>::type;
38 
39 template<class T>
40 struct element_at {
41  typedef rxu::decay_t<T> source_value_type;
42 
43  struct element_at_values {
44  element_at_values(int i)
45  : index(i)
46  {
47  }
48  int index;
49  };
50 
51  element_at_values initial;
52 
53  element_at(int i)
54  : initial(i)
55  {
56  }
57 
58  template<class Subscriber>
59  struct element_at_observer : public element_at_values
60  {
61  typedef element_at_observer<Subscriber> this_type;
62  typedef source_value_type value_type;
63  typedef rxu::decay_t<Subscriber> dest_type;
64  typedef observer<value_type, this_type> observer_type;
65  dest_type dest;
66  mutable int current;
67 
68  element_at_observer(dest_type d, element_at_values v)
69  : element_at_values(v),
70  dest(d),
71  current(0)
72  {
73  }
74  void on_next(source_value_type v) const {
75  if (current++ == this->index) {
76  dest.on_next(v);
77  dest.on_completed();
78  }
79  }
80  void on_error(std::exception_ptr e) const {
81  dest.on_error(e);
82  }
83  void on_completed() const {
84  if(current <= this->index) {
85  dest.on_error(std::make_exception_ptr(std::range_error("index is out of bounds")));
86  }
87  }
88 
89  static subscriber<value_type, observer_type> make(dest_type d, element_at_values v) {
90  return make_subscriber<value_type>(d, this_type(d, v));
91  }
92  };
93 
94  template<class Subscriber>
95  auto operator()(Subscriber dest) const
96  -> decltype(element_at_observer<Subscriber>::make(std::move(dest), initial)) {
97  return element_at_observer<Subscriber>::make(std::move(dest), initial);
98  }
99 };
100 
101 }
102 
105 template<class... AN>
106 auto element_at(AN&&... an)
108  return operator_factory<element_at_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
109 }
110 
111 }
112 
113 template<>
115 {
116  template<class Observable,
117  class Enabled = rxu::enable_if_all_true_type_t<
119  >,
120  class SourceValue = rxu::value_type_t<Observable>,
121  class element_at = rxo::detail::element_at<SourceValue>>
122  static auto member(Observable&& o, int index)
123  -> decltype(o.template lift<SourceValue>(element_at(index))) {
124  return o.template lift<SourceValue>(element_at(index));
125  }
126 
127  template<class... AN>
128  static operators::detail::element_at_invalid_t<AN...> member(const AN...) {
129  std::terminate();
130  return {};
131  static_assert(sizeof...(AN) == 10000, "element_at takes (required int)");
132  }
133 };
134 
135 }
136 
137 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
static auto member(Observable &&o, int index) -> decltype(o.template lift< SourceValue >(element_at(index)))
Definition: rx-element_at.hpp:122
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
auto element_at(AN &&...an) -> operator_factory< element_at_tag, AN... >
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-element_at.hpp:106
Definition: rx-operators.hpp:206
static operators::detail::element_at_invalid_t< AN... > member(const AN...)
Definition: rx-element_at.hpp:128
Definition: rx-predef.hpp:177