24 #if !defined(RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP) 25 #define RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP 27 #include "../rx-includes.hpp" 36 struct sequence_equal_invalid_arguments {};
39 struct sequence_equal_invalid :
public rxo::operator_base<sequence_equal_invalid_arguments<AN...>> {
40 using type = observable<sequence_equal_invalid_arguments<
AN...>, sequence_equal_invalid<
AN...>>;
43 using sequence_equal_invalid_t =
typename sequence_equal_invalid<
AN...>::type;
45 template<
class T,
class Observable,
class OtherObservable,
class BinaryPredicate,
class Coordination>
48 typedef rxu::decay_t<Observable> source_type;
49 typedef rxu::decay_t<T> source_value_type;
50 typedef rxu::decay_t<OtherObservable> other_source_type;
51 typedef typename other_source_type::value_type other_source_value_type;
52 typedef rxu::decay_t<BinaryPredicate> predicate_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
57 values(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
58 : source(std::move(s))
60 , pred(std::move(pred))
61 , coordination(std::move(sf))
66 other_source_type other;
68 coordination_type coordination;
73 sequence_equal(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
74 : initial(std::move(s), std::move(t), std::move(pred), std::move(sf))
78 template<
class Subscriber>
79 void on_subscribe(Subscriber s)
const {
81 typedef Subscriber output_type;
84 :
public std::enable_shared_from_this<state_type>
87 state_type(
const values& vals, coordinator_type coor,
const output_type& o)
89 , coordinator(std::move(coor))
91 , source_completed(
false)
92 , other_completed(
false)
94 out.add(other_lifetime);
95 out.add(source_lifetime);
98 composite_subscription other_lifetime;
99 composite_subscription source_lifetime;
100 coordinator_type coordinator;
103 mutable std::list<source_value_type> source_values;
104 mutable std::list<other_source_value_type> other_values;
105 mutable bool source_completed;
106 mutable bool other_completed;
109 auto coordinator = initial.coordination.create_coordinator();
110 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
113 [&](){
return state->coordinator.in(state->other); },
120 [&](){
return state->coordinator.in(state->source); },
122 if (source.empty()) {
126 auto check_equal = [state]() {
127 if(!state->source_values.empty() && !state->other_values.empty()) {
128 auto x = std::move(state->source_values.front());
129 state->source_values.pop_front();
131 auto y = std::move(state->other_values.front());
132 state->other_values.pop_front();
134 if (!state->pred(x, y)) {
135 state->out.on_next(
false);
136 state->out.on_completed();
139 if((!state->source_values.empty() && state->other_completed) ||
140 (!state->other_values.empty() && state->source_completed)) {
141 state->out.on_next(
false);
142 state->out.on_completed();
147 auto check_complete = [state]() {
148 if(state->source_completed && state->other_completed) {
149 state->out.on_next(state->source_values.empty() && state->other_values.empty());
150 state->out.on_completed();
154 auto sinkOther = make_subscriber<other_source_value_type>(
156 state->other_lifetime,
158 [state, check_equal](other_source_value_type t) {
159 auto& values = state->other_values;
164 [state](std::exception_ptr e) {
165 state->out.on_error(e);
168 [state, check_complete]() {
169 auto& completed = state->other_completed;
176 [&](){
return state->coordinator.out(sinkOther); },
178 if (selectedSinkOther.empty()) {
181 other->subscribe(std::move(selectedSinkOther.get()));
183 source.get().subscribe(
184 state->source_lifetime,
186 [state, check_equal](source_value_type t) {
187 auto& values = state->source_values;
192 [state](std::exception_ptr e) {
193 state->out.on_error(e);
196 [state, check_complete]() {
197 auto& completed = state->source_completed;
209 template<
class...
AN>
212 return operator_factory<sequence_equal_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
220 template<
class Observable,
class OtherObservable,
228 static Result
member(Observable&& o, OtherObservable&& t) {
229 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(),
identity_current_thread()));
232 template<
class Observable,
class OtherObservable,
class BinaryPredicate,
235 is_observable<Observable>,
236 is_observable<OtherObservable>,
239 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>,
rxu::decay_t<BinaryPredicate>, identity_one_worker>,
242 static Result
member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred) {
243 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred),
identity_current_thread()));
246 template<
class Observable,
class OtherObservable,
class Coordination,
248 is_observable<Observable>,
249 is_observable<OtherObservable>,
252 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>,
rxu::decay_t<Coordination>>,
255 static Result
member(Observable&& o, OtherObservable&& t, Coordination&& cn) {
256 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), std::forward<Coordination>(cn)));
259 template<
class Observable,
class OtherObservable,
class BinaryPredicate,
class Coordination,
261 is_observable<Observable>,
262 is_observable<OtherObservable>,
263 is_coordination<Coordination>>,
265 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<Coordination>>,
268 static Result
member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred, Coordination&& cn) {
269 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), std::forward<Coordination>(cn)));
272 template<
class...
AN>
273 static operators::detail::sequence_equal_invalid_t<
AN...>
member(
const AN&...) {
276 static_assert(
sizeof...(
AN) == 10000,
"sequence_equal takes (OtherObservable, optional BinaryPredicate, optional Coordination)");
Definition: rx-operators.hpp:360
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
static Result member(Observable &&o, OtherObservable &&t, BinaryPredicate &&pred)
Definition: rx-sequence_equal.hpp:242
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static Result member(Observable &&o, OtherObservable &&t, BinaryPredicate &&pred, Coordination &&cn)
Definition: rx-sequence_equal.hpp:268
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto sequence_equal(AN &&...an) -> operator_factory< sequence_equal_tag, AN... >
Determine whether two Observables emit the same sequence of items.
Definition: rx-sequence_equal.hpp:210
static Result member(Observable &&o, OtherObservable &&t, Coordination &&cn)
Definition: rx-sequence_equal.hpp:255
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
Definition: rx-util.hpp:420
static Result member(Observable &&o, OtherObservable &&t)
Definition: rx-sequence_equal.hpp:228
Definition: rx-util.hpp:802
static operators::detail::sequence_equal_invalid_t< AN... > member(const AN &...)
Definition: rx-sequence_equal.hpp:273
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37