RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23  struct not_void {};
24  template<class CS, class CT>
25  static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26  template<class CS, class CT>
27  static not_void check(...);
28 
29  typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30  static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
37  : public rxs::source_base<T>
38 {
39  struct state_type
40  : public std::enable_shared_from_this<state_type>
41  {
42  typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44  onsubscribe_type on_subscribe;
45  };
46  std::shared_ptr<state_type> state;
47 
48  template<class U>
49  friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51  template<class SO>
52  void construct(SO&& source, rxs::tag_source&&) {
53  rxu::decay_t<SO> so = std::forward<SO>(source);
54  state->on_subscribe = [so](subscriber<T> o) mutable {
55  so.on_subscribe(std::move(o));
56  };
57  }
58 
59  struct tag_function {};
60  template<class F>
61  void construct(F&& f, tag_function&&) {
62  state->on_subscribe = std::forward<F>(f);
63  }
64 
65 public:
66 
68 
70  {
71  }
72 
73  template<class SOF>
74  explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75  : state(std::make_shared<state_type>())
76  {
77  construct(std::forward<SOF>(sof),
78  typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79  }
80 
81  void on_subscribe(subscriber<T> o) const {
82  state->on_subscribe(std::move(o));
83  }
84 
85  template<class Subscriber>
86  typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
87  on_subscribe(Subscriber o) const {
88  state->on_subscribe(o.as_dynamic());
89  }
90 };
91 
92 template<class T>
93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94  return lhs.state == rhs.state;
95 }
96 template<class T>
97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98  return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
103  return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113  typedef typename SO::type type;
114  typedef typename type::value_type value_type;
115  static const bool value = true;
116  typedef observable<value_type, type> observable_type;
117  template<class... AN>
118  static observable_type make(const Default&, AN&&... an) {
119  return observable_type(type(std::forward<AN>(an)...));
120  }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125  static const bool value = false;
126  typedef Default observable_type;
127  template<class... AN>
128  static observable_type make(const observable_type& that, const AN&...) {
129  return that;
130  }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135  typedef typename SO::type type;
136  typedef typename type::value_type value_type;
137  static const bool value = true;
138  typedef observable<value_type, type> observable_type;
139  template<class... AN>
140  static observable_type make(AN&&... an) {
141  return observable_type(type(std::forward<AN>(an)...));
142  }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147  static const bool value = false;
148  typedef void observable_type;
149  template<class... AN>
150  static observable_type make(const AN&...) {
151  }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
158  : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
168 template<class T, class Observable>
170 {
171  template<class Obsvbl, class... ArgN>
172  static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173  -> void {
174  std::mutex lock;
175  std::condition_variable wake;
176  std::exception_ptr error;
177 
178  struct tracking
179  {
180  ~tracking()
181  {
182  if (!disposed || !wakened) std::terminate();
183  }
184  tracking()
185  {
186  disposed = false;
187  wakened = false;
188  false_wakes = 0;
189  true_wakes = 0;
190  }
191  std::atomic_bool disposed;
192  std::atomic_bool wakened;
193  std::atomic_int false_wakes;
194  std::atomic_int true_wakes;
195  };
196  auto track = std::make_shared<tracking>();
197 
198  auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
199 
200  // keep any error to rethrow at the end.
201  auto scbr = make_subscriber<T>(
202  dest,
203  [&](T t){dest.on_next(t);},
204  [&](std::exception_ptr e){
205  if (do_rethrow) {
206  error = e;
207  } else {
208  dest.on_error(e);
209  }
210  },
211  [&](){dest.on_completed();}
212  );
213 
214  auto cs = scbr.get_subscription();
215  cs.add(
216  [&, track](){
217  // OSX geting invalid x86 op if notify_one is after the disposed = true
218  // presumably because the condition_variable may already have been awakened
219  // and is now sitting in a while loop on disposed
220  wake.notify_one();
221  track->disposed = true;
222  });
223 
224  std::unique_lock<std::mutex> guard(lock);
225  source.subscribe(std::move(scbr));
226 
227  wake.wait(guard,
228  [&, track](){
229  // this is really not good.
230  // false wakeups were never followed by true wakeups so..
231 
232  // anyways this gets triggered before disposed is set now so wait.
233  while (!track->disposed) {
234  ++track->false_wakes;
235  }
236  ++track->true_wakes;
237  return true;
238  });
239  track->wakened = true;
240  if (!track->disposed || !track->wakened) std::terminate();
241 
242  if (error) {std::rethrow_exception(error);}
243  }
244 
245 public:
247  observable_type source;
249  {
250  }
251  blocking_observable(observable_type s) : source(std::move(s)) {}
252 
268  template<class... ArgN>
269  auto subscribe(ArgN&&... an) const
270  -> void {
271  return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
272  }
273 
293  template<class... ArgN>
294  auto subscribe_with_rethrow(ArgN&&... an) const
295  -> void {
296  return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
297  }
298 
314  template<class... AN>
315  auto first(AN**...) -> delayed_type_t<T, AN...> const {
316  rxu::maybe<T> result;
318  subscribe_with_rethrow(
319  cs,
320  [&](T v){result.reset(v); cs.unsubscribe();});
321  if (result.empty())
322  throw rxcpp::empty_error("first() requires a stream with at least one value");
323  return result.get();
324  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
325  }
326 
342  template<class... AN>
343  auto last(AN**...) -> delayed_type_t<T, AN...> const {
344  rxu::maybe<T> result;
345  subscribe_with_rethrow(
346  [&](T v){result.reset(v);});
347  if (result.empty())
348  throw rxcpp::empty_error("last() requires a stream with at least one value");
349  return result.get();
350  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
351  }
352 
365  int count() const {
366  int result = 0;
367  source.count().as_blocking().subscribe_with_rethrow(
368  [&](int v){result = v;});
369  return result;
370  }
371 
389  T sum() const {
390  return source.sum().as_blocking().last();
391  }
392 
410  double average() const {
411  return source.average().as_blocking().last();
412  }
413 
431  T max() const {
432  return source.max().as_blocking().last();
433  }
434 
452  T min() const {
453  return source.min().as_blocking().last();
454  }
455 };
456 
457 namespace detail {
458 
459 template<class SourceOperator, class Subscriber>
460 struct safe_subscriber
461 {
462  safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
463 
464  void subscribe() {
465  try {
466  so->on_subscribe(*o);
467  }
468  catch(...) {
469  if (!o->is_subscribed()) {
470  throw;
471  }
472  o->on_error(std::current_exception());
473  o->unsubscribe();
474  }
475  }
476 
477  void operator()(const rxsc::schedulable&) {
478  subscribe();
479  }
480 
481  SourceOperator* so;
482  Subscriber* o;
483 };
484 
485 }
486 
487 template<>
488 class observable<void, void>;
489 
509 template<class T, class SourceOperator>
511  : public observable_base<T>
512 {
513  static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
514 
516 
517 public:
519  mutable source_operator_type source_operator;
520 
521 private:
522 
523  template<class U, class SO>
524  friend class observable;
525 
526  template<class U, class SO>
527  friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
528 
529  template<class Subscriber>
530  auto detail_subscribe(Subscriber o) const
532 
533  typedef rxu::decay_t<Subscriber> subscriber_type;
534 
535  static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
536  static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
537  static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
538 
539  trace_activity().subscribe_enter(*this, o);
540 
541  if (!o.is_subscribed()) {
542  trace_activity().subscribe_return(*this);
543  return o.get_subscription();
544  }
545 
546  detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
547 
548  // make sure to let current_thread take ownership of the thread as early as possible.
549  if (rxsc::current_thread::is_schedule_required()) {
550  const auto& sc = rxsc::make_current_thread();
551  sc.create_worker(o.get_subscription()).schedule(subscriber);
552  } else {
553  // current_thread already owns this thread.
554  subscriber.subscribe();
555  }
556 
557  trace_activity().subscribe_return(*this);
558  return o.get_subscription();
559  }
560 
561 public:
562  typedef T value_type;
563 
564  static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
565 
567  {
568  }
569 
571  {
572  }
573 
574  explicit observable(const source_operator_type& o)
575  : source_operator(o)
576  {
577  }
578  explicit observable(source_operator_type&& o)
579  : source_operator(std::move(o))
580  {
581  }
582 
584  template<class SO>
586  : source_operator(o.source_operator)
587  {}
589  template<class SO>
591  : source_operator(std::move(o.source_operator))
592  {}
593 
594 #if 0
595  template<class I>
596  void on_subscribe(observer<T, I> o) const {
597  source_operator.on_subscribe(o);
598  }
599 #endif
600 
603  template<class... AN>
604  observable<T> as_dynamic(AN**...) const {
605  return *this;
606  static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
607  }
608 
611  template<class... AN>
613  return blocking_observable<T, this_type>(*this);
614  static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
615  }
616 
618 
624  template<class OperatorFactory>
625  auto op(OperatorFactory&& of) const
626  -> decltype(of(*(const this_type*)nullptr)) {
627  return of(*this);
628  static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
629  }
630 
633  template<class ResultType, class Operator>
634  auto lift(Operator&& op) const
635  -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
636  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
637  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
638  static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
639  }
640 
646  template<class ResultType, class Operator>
647  auto lift_if(Operator&& op) const
648  -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
649  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
650  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
651  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
652  }
658  template<class ResultType, class Operator>
659  auto lift_if(Operator&&) const
660  -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
661  decltype(rxs::from<ResultType>())>::type {
662  return rxs::from<ResultType>();
663  }
665 
668  template<class... ArgN>
669  auto subscribe(ArgN&&... an) const
671  return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
672  }
673 
676  template<class... AN>
677  auto all(AN&&... an) const
679  -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
681  {
682  return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
683  }
684 
687  template<class... AN>
688  auto is_empty(AN&&... an) const
690  -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
692  {
693  return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
694  }
695 
698  template<class... AN>
699  auto any(AN&&... an) const
701  -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
703  {
704  return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
705  }
706 
709  template<class... AN>
710  auto exists(AN&&... an) const
712  -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
714  {
715  return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
716  }
717 
720  template<class... AN>
721  auto contains(AN&&... an) const
723  -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
725  {
726  return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
727  }
728 
731  template<class... AN>
732  auto filter(AN&&... an) const
734  -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
736  {
737  return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
738  }
739 
742  template<class... AN>
743  auto switch_if_empty(AN&&... an) const
745  -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
747  {
748  return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
749  }
750 
753  template<class... AN>
754  auto default_if_empty(AN&&... an) const
756  -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
758  {
759  return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
760  }
761 
764  template<class... AN>
765  auto sequence_equal(AN... an) const
767  -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
769  {
770  return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
771  }
772 
775  template<class... AN>
776  auto tap(AN&&... an) const
778  -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
780  {
781  return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
782  }
783 
786  template<class... AN>
787  auto time_interval(AN&&... an) const
789  -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
791  {
792  return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
793  }
794 
797  template<class... AN>
798  auto timeout(AN&&... an) const
800  -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
802  {
803  return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
804  }
805 
808  template<class... AN>
809  auto timestamp(AN&&... an) const
811  -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
813  {
814  return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
815  }
816 
819  template<class... AN>
820  auto finally(AN&&... an) const
822  -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
824  {
825  return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
826  }
827 
830  template<class... AN>
831  auto on_error_resume_next(AN&&... an) const
833  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
835  {
836  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
837  }
838 
841  template<class... AN>
842  auto switch_on_error(AN&&... an) const
844  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
846  {
847  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
848  }
849 
852  template<class... AN>
853  auto map(AN&&... an) const
855  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
857  {
858  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
859  }
860 
863  template<class... AN>
864  auto transform(AN&&... an) const
866  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
868  {
869  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
870  }
871 
874  template<class... AN>
875  auto debounce(AN&&... an) const
877  -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
879  {
880  return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
881  }
882 
885  template<class... AN>
886  auto delay(AN&&... an) const
888  -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
890  {
891  return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
892  }
893 
896  template<class... AN>
897  auto distinct(AN&&... an) const
899  -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
901  {
902  return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
903  }
904 
907  template<class... AN>
908  auto distinct_until_changed(AN&&... an) const
910  -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
912  {
913  return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
914  }
915 
918  template<class... AN>
919  auto element_at(AN&&... an) const
921  -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
923  {
924  return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
925  }
926 
929  template<class... AN>
930  auto window(AN&&... an) const
932  -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
934  {
935  return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
936  }
937 
940  template<class... AN>
941  auto window_with_time(AN&&... an) const
943  -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
945  {
946  return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
947  }
948 
951  template<class... AN>
952  auto window_with_time_or_count(AN&&... an) const
954  -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
956  {
957  return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
958  }
959 
962  template<class... AN>
963  auto window_toggle(AN&&... an) const
965  -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
967  {
968  return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
969  }
970 
973  template<class... AN>
974  auto buffer(AN&&... an) const
976  -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
978  {
979  return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
980  }
981 
984  template<class... AN>
985  auto buffer_with_time(AN&&... an) const
987  -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
989  {
990  return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
991  }
992 
995  template<class... AN>
996  auto buffer_with_time_or_count(AN&&... an) const
998  -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1000  {
1001  return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
1002  }
1003 
1006  template<class... AN>
1007  auto switch_on_next(AN&&... an) const
1009  -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1011  {
1012  return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
1013  }
1014 
1017  template<class... AN>
1018  auto merge(AN... an) const
1020  -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1022  {
1023  return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
1024  }
1025 
1028  template<class... AN>
1029  auto amb(AN... an) const
1031  -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1033  {
1034  return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1035  }
1036 
1039  template<class... AN>
1040  auto flat_map(AN&&... an) const
1042  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1044  {
1045  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1046  }
1047 
1050  template<class... AN>
1051  auto merge_transform(AN&&... an) const
1053  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1055  {
1056  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1057  }
1058 
1061  template<class... AN>
1062  auto concat(AN... an) const
1064  -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1066  {
1067  return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1068  }
1069 
1072  template<class... AN>
1073  auto concat_map(AN&&... an) const
1075  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1077  {
1078  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1079  }
1080 
1083  template<class... AN>
1084  auto concat_transform(AN&&... an) const
1086  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1088  {
1089  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1090  }
1091 
1094  template<class... AN>
1095  auto with_latest_from(AN... an) const
1097  -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1099  {
1100  return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1101  }
1102 
1103 
1106  template<class... AN>
1107  auto combine_latest(AN... an) const
1109  -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1111  {
1112  return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1113  }
1114 
1117  template<class... AN>
1118  auto zip(AN&&... an) const
1120  -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1122  {
1123  return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1124  }
1125 
1128  template<class... AN>
1129  inline auto group_by(AN&&... an) const
1131  -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1133  {
1134  return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1135  }
1136 
1139  template<class... AN>
1140  auto ignore_elements(AN&&... an) const
1142  -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1144  {
1145  return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1146  }
1147 
1150  template<class... AN>
1151  auto multicast(AN&&... an) const
1153  -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1155  {
1156  return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1157  }
1158 
1161  template<class... AN>
1162  auto publish(AN&&... an) const
1164  -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1166  {
1167  return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1168  }
1169 
1172  template<class... AN>
1173  auto publish_synchronized(AN&&... an) const
1175  -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1177  {
1178  return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1179  }
1180 
1183  template<class... AN>
1184  auto replay(AN&&... an) const
1186  -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1188  {
1189  return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1190  }
1191 
1194  template<class... AN>
1195  auto subscribe_on(AN&&... an) const
1197  -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1199  {
1200  return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1201  }
1202 
1205  template<class... AN>
1206  auto observe_on(AN&&... an) const
1208  -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1210  {
1211  return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1212  }
1213 
1216  template<class... AN>
1217  auto reduce(AN&&... an) const
1219  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1221  {
1222  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1223  }
1224 
1227  template<class... AN>
1228  auto accumulate(AN&&... an) const
1230  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1232  {
1233  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1234  }
1235 
1238  template<class... AN>
1239  auto first(AN**...) const
1241  -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr))
1243  {
1245  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1246  }
1247 
1250  template<class... AN>
1251  auto last(AN**...) const
1253  -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr))
1255  {
1257  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1258  }
1259 
1262  template<class... AN>
1263  auto count(AN**...) const
1265  -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>()))
1267  {
1269  static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1270  }
1271 
1274  template<class... AN>
1275  auto sum(AN**...) const
1277  -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr))
1279  {
1281  static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1282  }
1283 
1286  template<class... AN>
1287  auto average(AN**...) const
1289  -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr))
1291  {
1293  static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1294  }
1295 
1298  template<class... AN>
1299  auto max(AN**...) const
1301  -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr))
1303  {
1305  static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1306  }
1307 
1310  template<class... AN>
1311  auto min(AN**...) const
1313  -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr))
1315  {
1317  static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1318  }
1319 
1322  template<class... AN>
1323  auto scan(AN... an) const
1325  -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1327  {
1328  return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1329  }
1330 
1333  template<class... AN>
1334  auto sample_with_time(AN&&... an) const
1336  -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1338  {
1339  return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1340  }
1341 
1344  template<class... AN>
1345  auto skip(AN... an) const
1347  -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1349  {
1350  return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1351  }
1352 
1355  template<class... AN>
1356  auto skip_last(AN... an) const
1358  -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1360  {
1361  return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1362  }
1363 
1366  template<class... AN>
1367  auto skip_until(AN... an) const
1369  -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1371  {
1372  return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1373  }
1374 
1377  template<class... AN>
1378  auto take(AN... an) const
1380  -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1382  {
1383  return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1384  }
1385 
1388  template<class... AN>
1389  auto take_last(AN&&... an) const
1391  -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1393  {
1394  return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1395  }
1396 
1399  template<class... AN>
1400  auto take_until(AN&&... an) const
1402  -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1404  {
1405  return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1406  }
1407 
1410  template<class... AN>
1411  auto take_while(AN&&... an) const
1413  -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1415  {
1416  return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1417  }
1418 
1421  template<class... AN>
1422  auto repeat(AN... an) const
1424  -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1426  {
1427  return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1428  }
1429 
1432  template<class... AN>
1433  auto retry(AN... an) const
1435  -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1437  {
1438  return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1439  }
1440 
1443  template<class... AN>
1444  auto start_with(AN... an) const
1446  -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1448  {
1449  return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1450  }
1451 
1454  template<class... AN>
1455  auto pairwise(AN... an) const
1457  -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1459  {
1460  return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1461  }
1462 };
1463 
1464 template<class T, class SourceOperator>
1466  return lhs.source_operator == rhs.source_operator;
1467 }
1468 template<class T, class SourceOperator>
1470  return !(lhs == rhs);
1471 }
1472 
1555 template<>
1556 class observable<void, void>
1557 {
1558  ~observable();
1559 public:
1562  template<class T, class OnSubscribe>
1563  static auto create(OnSubscribe os)
1564  -> decltype(rxs::create<T>(std::move(os))) {
1565  return rxs::create<T>(std::move(os));
1566  }
1567 
1570  template<class T>
1571  static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1572  -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1573  return rxs::range<T>(first, last, step, identity_current_thread());
1574  }
1577  template<class T, class Coordination>
1578  static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1579  -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1580  return rxs::range<T>(first, last, step, std::move(cn));
1581  }
1584  template<class T, class Coordination>
1585  static auto range(T first, T last, Coordination cn)
1586  -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1587  return rxs::range<T>(first, last, std::move(cn));
1588  }
1591  template<class T, class Coordination>
1592  static auto range(T first, Coordination cn)
1593  -> decltype(rxs::range<T>(first, std::move(cn))) {
1594  return rxs::range<T>(first, std::move(cn));
1595  }
1596 
1599  template<class T>
1600  static auto never()
1601  -> decltype(rxs::never<T>()) {
1602  return rxs::never<T>();
1603  }
1604 
1607  template<class ObservableFactory>
1608  static auto defer(ObservableFactory of)
1609  -> decltype(rxs::defer(std::move(of))) {
1610  return rxs::defer(std::move(of));
1611  }
1612 
1615  template<class... AN>
1616  static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1617  -> decltype(rxs::interval(period)) {
1618  return rxs::interval(period);
1619  static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1620  }
1623  template<class Coordination>
1624  static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1625  -> decltype(rxs::interval(period, std::move(cn))) {
1626  return rxs::interval(period, std::move(cn));
1627  }
1630  template<class... AN>
1631  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1632  -> decltype(rxs::interval(initial, period)) {
1633  return rxs::interval(initial, period);
1634  static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1635  }
1638  template<class Coordination>
1639  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1640  -> decltype(rxs::interval(initial, period, std::move(cn))) {
1641  return rxs::interval(initial, period, std::move(cn));
1642  }
1643 
1646  template<class... AN>
1647  static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1648  -> decltype(rxs::timer(at)) {
1649  return rxs::timer(at);
1650  static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1651  }
1654  template<class... AN>
1655  static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1656  -> decltype(rxs::timer(after)) {
1657  return rxs::timer(after);
1658  static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1659  }
1662  template<class Coordination>
1663  static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1664  -> decltype(rxs::timer(when, std::move(cn))) {
1665  return rxs::timer(when, std::move(cn));
1666  }
1669  template<class Coordination>
1670  static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1671  -> decltype(rxs::timer(when, std::move(cn))) {
1672  return rxs::timer(when, std::move(cn));
1673  }
1674 
1677  template<class Collection>
1678  static auto iterate(Collection c)
1679  -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1680  return rxs::iterate(std::move(c), identity_current_thread());
1681  }
1684  template<class Collection, class Coordination>
1685  static auto iterate(Collection c, Coordination cn)
1686  -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1687  return rxs::iterate(std::move(c), std::move(cn));
1688  }
1689 
1692  template<class T>
1693  static auto from()
1694  -> decltype( rxs::from<T>()) {
1695  return rxs::from<T>();
1696  }
1699  template<class T, class Coordination>
1700  static auto from(Coordination cn)
1701  -> typename std::enable_if<is_coordination<Coordination>::value,
1702  decltype( rxs::from<T>(std::move(cn)))>::type {
1703  return rxs::from<T>(std::move(cn));
1704  }
1707  template<class Value0, class... ValueN>
1708  static auto from(Value0 v0, ValueN... vn)
1709  -> typename std::enable_if<!is_coordination<Value0>::value,
1710  decltype( rxs::from(v0, vn...))>::type {
1711  return rxs::from(v0, vn...);
1712  }
1715  template<class Coordination, class Value0, class... ValueN>
1716  static auto from(Coordination cn, Value0 v0, ValueN... vn)
1717  -> typename std::enable_if<is_coordination<Coordination>::value,
1718  decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1719  return rxs::from(std::move(cn), v0, vn...);
1720  }
1721 
1724  template<class T>
1725  static auto just(T v)
1726  -> decltype(rxs::just(std::move(v))) {
1727  return rxs::just(std::move(v));
1728  }
1731  template<class T, class Coordination>
1732  static auto just(T v, Coordination cn)
1733  -> decltype(rxs::just(std::move(v), std::move(cn))) {
1734  return rxs::just(std::move(v), std::move(cn));
1735  }
1736 
1739  template<class Observable, class Value0, class... ValueN>
1740  static auto start_with(Observable o, Value0 v0, ValueN... vn)
1741  -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1742  return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1743  }
1744 
1747  template<class T>
1748  static auto empty()
1749  -> decltype(from<T>()) {
1750  return from<T>();
1751  }
1754  template<class T, class Coordination>
1755  static auto empty(Coordination cn)
1756  -> decltype(from<T>(std::move(cn))) {
1757  return from<T>(std::move(cn));
1758  }
1759 
1762  template<class T, class Exception>
1763  static auto error(Exception&& e)
1764  -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1765  return rxs::error<T>(std::forward<Exception>(e));
1766  }
1769  template<class T, class Exception, class Coordination>
1770  static auto error(Exception&& e, Coordination cn)
1771  -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1772  return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1773  }
1774 
1777  template<class ResourceFactory, class ObservableFactory>
1778  static auto scope(ResourceFactory rf, ObservableFactory of)
1779  -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1780  return rxs::scope(std::move(rf), std::move(of));
1781  }
1782 };
1783 
1784 }
1785 
1786 //
1787 // support range() >> filter() >> subscribe() syntax
1788 // '>>' is spelled 'stream'
1789 //
1790 template<class T, class SourceOperator, class OperatorFactory>
1791 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1792  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793  return source.op(std::forward<OperatorFactory>(of));
1794 }
1795 
1796 //
1797 // support range() | filter() | subscribe() syntax
1798 // '|' is spelled 'pipe'
1799 //
1800 template<class T, class SourceOperator, class OperatorFactory>
1801 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1802  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1803  return source.op(std::forward<OperatorFactory>(of));
1804 }
1805 
1806 #endif
Definition: rx-operators.hpp:126
auto subscribe(ArgN &&...an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:669
Definition: rx-operators.hpp:360
auto take_until(AN &&...an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1400
auto pairwise(AN...an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1455
auto map(AN &&...an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:853
Definition: rx-operators.hpp:248
auto switch_if_empty(AN &&...an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable...
Definition: rx-observable.hpp:743
Definition: rx-operators.hpp:143
auto buffer(AN &&...an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:974
Definition: rx-observable.hpp:157
Definition: rx-operators.hpp:374
Definition: rx-operators.hpp:445
auto timestamp(AN &&...an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:809
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-observable.hpp:1311
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1801
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-observable.hpp:1563
Definition: rx-all.hpp:26
Definition: rx-predef.hpp:302
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-observable.hpp:1299
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1585
auto debounce(AN &&...an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:875
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:585
Definition: rx-operators.hpp:323
auto publish_synchronized(AN &&...an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1173
source_operator_type source_operator
Definition: rx-observable.hpp:519
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:431
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:157
auto flat_map(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1040
Definition: rx-operators.hpp:276
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:246
auto element_at(AN &&...an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:919
T max() const
Definition: rx-observable.hpp:431
Definition: rx-operators.hpp:283
auto observe_on(AN &&...an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1206
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1251
Definition: rx-operators.hpp:290
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
auto concat_transform(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1084
Definition: rx-operators.hpp:346
auto combine_latest(AN...an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1107
auto take_while(AN &&...an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1411
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1685
Definition: rx-operators.hpp:408
observable(const source_operator_type &o)
Definition: rx-observable.hpp:574
Definition: rx-operators.hpp:487
Definition: rx-operators.hpp:381
rxu::value_type_t< delayed_type< T, AN... >> delayed_type_t
Definition: rx-operators.hpp:60
auto skip_until(AN...an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1367
Definition: rx-operators.hpp:459
auto skip_last(AN...an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1356
Definition: rx-operators.hpp:325
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1678
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:562
Definition: rx-operators.hpp:508
auto distinct(AN &&...an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:897
Definition: rx-operators.hpp:339
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:518
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1616
auto window_with_time(AN &&...an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:941
auto skip(AN...an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1345
auto switch_on_error(AN &&...an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:842
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
~blocking_observable()
Definition: rx-observable.hpp:248
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1725
Definition: rx-operators.hpp:367
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1608
Definition: rx-operators.hpp:310
auto exists(AN &&...an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:710
Definition: rx-sources.hpp:15
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:612
Definition: rx-operators.hpp:438
Definition: rx-operators.hpp:298
Definition: rx-operators.hpp:296
auto scan(AN...an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1323
auto tap(AN &&...an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:776
auto start_with(AN &&...an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
Definition: rx-operators.hpp:227
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1275
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:404
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
Definition: rx-observable.hpp:36
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1647
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1663
~observable()
Definition: rx-observable.hpp:566
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:343
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
auto repeat(AN...an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1422
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:417
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
auto window_with_time_or_count(AN &&...an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:952
Definition: rx-operators.hpp:262
auto is_empty(AN &&...an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-observable.hpp:688
auto group_by(AN &&...an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1129
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
auto window_toggle(AN &&...an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:963
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:424
auto sample_with_time(AN &&...an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1334
Definition: rx-operators.hpp:353
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:315
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void ** >::type=0)
Definition: rx-observable.hpp:74
Definition: rx-operators.hpp:466
Definition: rx-operators.hpp:136
auto transform(AN &&...an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:864
auto replay(AN &&...an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-observable.hpp:1184
static auto start_with(Observable o, Value0 v0, ValueN...vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1740
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:297
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
Definition: rx-operators.hpp:192
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1770
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1571
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1748
auto ignore_elements(AN &&...an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1140
Definition: rx-operators.hpp:473
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto on_error_resume_next(AN &&...an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:831
auto distinct_until_changed(AN &&...an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:908
void unsubscribe() const
Definition: rx-subscription.hpp:170
auto window(AN &&...an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:930
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:301
Definition: rx-operators.hpp:220
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1287
Definition: rx-predef.hpp:270
Definition: rx-operators.hpp:185
auto buffer_with_time_or_count(AN &&...an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:996
auto sequence_equal(AN...an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:765
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1592
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1631
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1763
auto merge(AN...an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-observable.hpp:1018
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1639
auto amb(AN...an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1029
auto with_latest_from(AN...an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1095
observable_type source
Definition: rx-observable.hpp:247
T sum() const
Definition: rx-observable.hpp:389
static auto from(Coordination cn, Value0 v0, ValueN...vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype( rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1716
Definition: rx-operators.hpp:452
Definition: rx-operators.hpp:269
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto concat(AN...an) const
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-observable.hpp:1062
static auto from(Value0 v0, ValueN...vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype( rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1708
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto retry(AN...an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1433
auto switch_on_next(AN &&...an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:1007
Definition: rx-operators.hpp:480
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
Definition: rx-operators.hpp:150
double average() const
Definition: rx-observable.hpp:410
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1670
auto contains(AN &&...an) const
Returns an Observable that emits true if the source Observable emitted a specified item...
Definition: rx-observable.hpp:721
auto multicast(AN &&...an) const
Definition: rx-observable.hpp:1151
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype( rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1700
Definition: rx-operators.hpp:332
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
auto reduce(AN &&...an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1217
auto take_last(AN &&...an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1389
auto take(AN...an) const
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-observable.hpp:1378
auto start_with(AN...an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1444
blocking_observable(observable_type s)
Definition: rx-observable.hpp:251
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto all(AN &&...an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:677
auto default_if_empty(AN &&...an) const
If the source Observable terminates without emitting any items, emits a default item and completes...
Definition: rx-observable.hpp:754
auto observable_member(Tag, AN &&...an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
Definition: rx-operators.hpp:164
auto subscribe_with_rethrow(ArgN &&...an) const -> void
Definition: rx-observable.hpp:294
Definition: rx-operators.hpp:317
Definition: rx-operators.hpp:494
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1263
auto subscribe(ArgN &&...an) const -> void
Definition: rx-observable.hpp:269
static auto from() -> decltype( rxs::from< T >())
Definition: rx-observable.hpp:1693
Definition: rx-predef.hpp:128
auto time_interval(AN &&...an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:787
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1778
Definition: rx-operators.hpp:410
auto buffer_with_time(AN &&...an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:985
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
auto delay(AN &&...an) const
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-observable.hpp:886
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1624
Definition: rx-operators.hpp:300
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1578
auto accumulate(AN &&...an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1228
auto any(AN &&...an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:699
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1732
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:590
Definition: rx-scheduler.hpp:426
Definition: rx-predef.hpp:115
auto subscribe_on(AN &&...an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1195
auto timeout(AN &&...an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:798
Definition: rx-sources.hpp:17
auto zip(AN &&...an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1118
auto concat_map(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1073
auto merge_transform(AN &&...an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1051
observable()
Definition: rx-observable.hpp:570
Definition: rx-operators.hpp:299
static auto never() -> decltype(rxs::never< T >())
Definition: rx-observable.hpp:1600
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1239
T min() const
Definition: rx-observable.hpp:452
Definition: rx-predef.hpp:126
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
auto publish(AN &&...an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1162
auto filter(AN &&...an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:732
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1755
Definition: rx-operators.hpp:402
auto subscribe(ArgN &&...an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:604
int count() const
Definition: rx-observable.hpp:365
Definition: rx-operators.hpp:395
observable(source_operator_type &&o)
Definition: rx-observable.hpp:578
Definition: rx-operators.hpp:388
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1655
Definition: rx-operators.hpp:501