RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-sequence_equal.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
24 #if !defined(RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP)
25 #define RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP
26 
27 #include "../rx-includes.hpp"
28 
29 namespace rxcpp {
30 
31 namespace operators {
32 
33 namespace detail {
34 
35 template<class... AN>
36 struct sequence_equal_invalid_arguments {};
37 
38 template<class... AN>
39 struct sequence_equal_invalid : public rxo::operator_base<sequence_equal_invalid_arguments<AN...>> {
40  using type = observable<sequence_equal_invalid_arguments<AN...>, sequence_equal_invalid<AN...>>;
41 };
42 template<class... AN>
43 using sequence_equal_invalid_t = typename sequence_equal_invalid<AN...>::type;
44 
45 template<class T, class Observable, class OtherObservable, class BinaryPredicate, class Coordination>
46 struct sequence_equal : public operator_base<bool>
47 {
48  typedef rxu::decay_t<Observable> source_type;
49  typedef rxu::decay_t<T> source_value_type;
50  typedef rxu::decay_t<OtherObservable> other_source_type;
51  typedef typename other_source_type::value_type other_source_value_type;
52  typedef rxu::decay_t<BinaryPredicate> predicate_type;
53  typedef rxu::decay_t<Coordination> coordination_type;
54  typedef typename coordination_type::coordinator_type coordinator_type;
55 
56  struct values {
57  values(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
58  : source(std::move(s))
59  , other(std::move(t))
60  , pred(std::move(pred))
61  , coordination(std::move(sf))
62  {
63  }
64 
65  source_type source;
66  other_source_type other;
67  predicate_type pred;
68  coordination_type coordination;
69  };
70 
71  values initial;
72 
73  sequence_equal(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
74  : initial(std::move(s), std::move(t), std::move(pred), std::move(sf))
75  {
76  }
77 
78  template<class Subscriber>
79  void on_subscribe(Subscriber s) const {
80 
81  typedef Subscriber output_type;
82 
83  struct state_type
84  : public std::enable_shared_from_this<state_type>
85  , public values
86  {
87  state_type(const values& vals, coordinator_type coor, const output_type& o)
88  : values(vals)
89  , coordinator(std::move(coor))
90  , out(o)
91  , source_completed(false)
92  , other_completed(false)
93  {
94  out.add(other_lifetime);
95  out.add(source_lifetime);
96  }
97 
98  composite_subscription other_lifetime;
99  composite_subscription source_lifetime;
100  coordinator_type coordinator;
101  output_type out;
102 
103  mutable std::list<source_value_type> source_values;
104  mutable std::list<other_source_value_type> other_values;
105  mutable bool source_completed;
106  mutable bool other_completed;
107  };
108 
109  auto coordinator = initial.coordination.create_coordinator();
110  auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
111 
112  auto other = on_exception(
113  [&](){ return state->coordinator.in(state->other); },
114  state->out);
115  if (other.empty()) {
116  return;
117  }
118 
119  auto source = on_exception(
120  [&](){ return state->coordinator.in(state->source); },
121  state->out);
122  if (source.empty()) {
123  return;
124  }
125 
126  auto check_equal = [state]() {
127  if(!state->source_values.empty() && !state->other_values.empty()) {
128  auto x = std::move(state->source_values.front());
129  state->source_values.pop_front();
130 
131  auto y = std::move(state->other_values.front());
132  state->other_values.pop_front();
133 
134  if (!state->pred(x, y)) {
135  state->out.on_next(false);
136  state->out.on_completed();
137  }
138  } else {
139  if((!state->source_values.empty() && state->other_completed) ||
140  (!state->other_values.empty() && state->source_completed)) {
141  state->out.on_next(false);
142  state->out.on_completed();
143  }
144  }
145  };
146 
147  auto check_complete = [state]() {
148  if(state->source_completed && state->other_completed) {
149  state->out.on_next(state->source_values.empty() && state->other_values.empty());
150  state->out.on_completed();
151  }
152  };
153 
154  auto sinkOther = make_subscriber<other_source_value_type>(
155  state->out,
156  state->other_lifetime,
157  // on_next
158  [state, check_equal](other_source_value_type t) {
159  auto& values = state->other_values;
160  values.push_back(t);
161  check_equal();
162  },
163  // on_error
164  [state](std::exception_ptr e) {
165  state->out.on_error(e);
166  },
167  // on_completed
168  [state, check_complete]() {
169  auto& completed = state->other_completed;
170  completed = true;
171  check_complete();
172  }
173  );
174 
175  auto selectedSinkOther = on_exception(
176  [&](){ return state->coordinator.out(sinkOther); },
177  state->out);
178  if (selectedSinkOther.empty()) {
179  return;
180  }
181  other->subscribe(std::move(selectedSinkOther.get()));
182 
183  source.get().subscribe(
184  state->source_lifetime,
185  // on_next
186  [state, check_equal](source_value_type t) {
187  auto& values = state->source_values;
188  values.push_back(t);
189  check_equal();
190  },
191  // on_error
192  [state](std::exception_ptr e) {
193  state->out.on_error(e);
194  },
195  // on_completed
196  [state, check_complete]() {
197  auto& completed = state->source_completed;
198  completed = true;
199  check_complete();
200  }
201  );
202  }
203 };
204 
205 }
206 
209 template<class... AN>
210 auto sequence_equal(AN&&... an)
212  return operator_factory<sequence_equal_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
213 }
214 
215 }
216 
217 template<>
219 {
220  template<class Observable, class OtherObservable,
221  class Enabled = rxu::enable_if_all_true_type_t<
224  class SourceValue = rxu::value_type_t<Observable>,
225  class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>, identity_one_worker>,
226  class Value = rxu::value_type_t<SequenceEqual>,
227  class Result = observable<Value, SequenceEqual>>
228  static Result member(Observable&& o, OtherObservable&& t) {
229  return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), identity_current_thread()));
230  }
231 
232  template<class Observable, class OtherObservable, class BinaryPredicate,
233  class IsCoordination = is_coordination<BinaryPredicate>,
234  class Enabled = rxu::enable_if_all_true_type_t<
235  is_observable<Observable>,
236  is_observable<OtherObservable>,
238  class SourceValue = rxu::value_type_t<Observable>,
239  class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, identity_one_worker>,
240  class Value = rxu::value_type_t<SequenceEqual>,
241  class Result = observable<Value, SequenceEqual>>
242  static Result member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred) {
243  return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), identity_current_thread()));
244  }
245 
246  template<class Observable, class OtherObservable, class Coordination,
247  class Enabled = rxu::enable_if_all_true_type_t<
248  is_observable<Observable>,
249  is_observable<OtherObservable>,
251  class SourceValue = rxu::value_type_t<Observable>,
252  class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>, rxu::decay_t<Coordination>>,
253  class Value = rxu::value_type_t<SequenceEqual>,
254  class Result = observable<Value, SequenceEqual>>
255  static Result member(Observable&& o, OtherObservable&& t, Coordination&& cn) {
256  return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), std::forward<Coordination>(cn)));
257  }
258 
259  template<class Observable, class OtherObservable, class BinaryPredicate, class Coordination,
260  class Enabled = rxu::enable_if_all_true_type_t<
261  is_observable<Observable>,
262  is_observable<OtherObservable>,
263  is_coordination<Coordination>>,
264  class SourceValue = rxu::value_type_t<Observable>,
265  class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<Coordination>>,
266  class Value = rxu::value_type_t<SequenceEqual>,
267  class Result = observable<Value, SequenceEqual>>
268  static Result member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred, Coordination&& cn) {
269  return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), std::forward<Coordination>(cn)));
270  }
271 
272  template<class... AN>
273  static operators::detail::sequence_equal_invalid_t<AN...> member(const AN&...) {
274  std::terminate();
275  return {};
276  static_assert(sizeof...(AN) == 10000, "sequence_equal takes (OtherObservable, optional BinaryPredicate, optional Coordination)");
277  }
278 };
279 
280 }
281 
282 #endif
Definition: rx-operators.hpp:360
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o, OtherObservable &&t, BinaryPredicate &&pred)
Definition: rx-sequence_equal.hpp:242
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static Result member(Observable &&o, OtherObservable &&t, BinaryPredicate &&pred, Coordination &&cn)
Definition: rx-sequence_equal.hpp:268
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto sequence_equal(AN &&...an) -> operator_factory< sequence_equal_tag, AN... >
Determine whether two Observables emit the same sequence of items.
Definition: rx-sequence_equal.hpp:210
static Result member(Observable &&o, OtherObservable &&t, Coordination &&cn)
Definition: rx-sequence_equal.hpp:255
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
Definition: rx-util.hpp:420
static Result member(Observable &&o, OtherObservable &&t)
Definition: rx-sequence_equal.hpp:228
Definition: rx-util.hpp:802
static operators::detail::sequence_equal_invalid_t< AN... > member(const AN &...)
Definition: rx-sequence_equal.hpp:273
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37