RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-replay.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
63 #if !defined(RXCPP_OPERATORS_RX_REPLAY_HPP)
64 #define RXCPP_OPERATORS_RX_REPLAY_HPP
65 
66 #include "../rx-includes.hpp"
67 #include "./rx-multicast.hpp"
68 
69 namespace rxcpp {
70 
71 namespace operators {
72 
73 namespace detail {
74 
75 template<class... AN>
76 struct replay_invalid_arguments {};
77 
78 template<class... AN>
79 struct replay_invalid : public rxo::operator_base<replay_invalid_arguments<AN...>> {
80  using type = observable<replay_invalid_arguments<AN...>, replay_invalid<AN...>>;
81 };
82 template<class... AN>
83 using replay_invalid_t = typename replay_invalid<AN...>::type;
84 
85 }
86 
89 template<class... AN>
90 auto replay(AN&&... an)
92  return operator_factory<replay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
93 }
94 
95 }
96 
97  template<>
99 {
100  template<class Observable,
101  class Enabled = rxu::enable_if_all_true_type_t<
103  class SourceValue = rxu::value_type_t<Observable>,
105  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
107  >
108  static Result member(Observable&& o) {
109  return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), composite_subscription())));
110  }
111 
112  template<class Observable,
113  class Enabled = rxu::enable_if_all_true_type_t<
114  is_observable<Observable>>,
115  class SourceValue = rxu::value_type_t<Observable>,
117  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
119  >
120  static Result member(Observable&& o, composite_subscription cs) {
121  return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), cs)));
122  }
123 
124  template<class Observable, class Coordination,
125  class Enabled = rxu::enable_if_all_true_type_t<
126  is_observable<Observable>,
128  class SourceValue = rxu::value_type_t<Observable>,
130  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
132  >
133  static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
134  return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
135  }
136 
137  template<class Observable, class Count,
138  class Enabled = rxu::enable_if_all_true_type_t<
139  is_observable<Observable>,
140  std::is_integral<Count>>,
141  class SourceValue = rxu::value_type_t<Observable>,
143  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
145  >
146  static Result member(Observable&& o, Count count, composite_subscription cs = composite_subscription()) {
147  return Result(Multicast(std::forward<Observable>(o), Subject(count, identity_current_thread(), cs)));
148  }
149 
150  template<class Observable, class Count, class Coordination,
151  class Enabled = rxu::enable_if_all_true_type_t<
152  is_observable<Observable>,
153  std::is_integral<Count>,
154  is_coordination<Coordination>>,
155  class SourceValue = rxu::value_type_t<Observable>,
157  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
159  >
160  static Result member(Observable&& o, Count count, Coordination&& cn, composite_subscription cs = composite_subscription()) {
161  return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Coordination>(cn), cs)));
162  }
163 
164  template<class Observable, class Duration,
165  class IsDuration = rxu::is_duration<Duration>,
166  class Enabled = rxu::enable_if_all_true_type_t<
167  is_observable<Observable>,
168  IsDuration>,
169  class SourceValue = rxu::value_type_t<Observable>,
171  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
173  >
174  static Result member(Observable&& o, Duration&& d, composite_subscription cs = composite_subscription()) {
175  return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), identity_current_thread(), cs)));
176  }
177 
178  template<class Observable, class Duration, class Coordination,
179  class IsDuration = rxu::is_duration<Duration>,
180  class Enabled = rxu::enable_if_all_true_type_t<
181  is_observable<Observable>,
182  IsDuration,
183  is_coordination<Coordination>>,
184  class SourceValue = rxu::value_type_t<Observable>,
186  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
188  >
189  static Result member(Observable&& o, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) {
190  return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), std::forward<Coordination>(cn), cs)));
191  }
192 
193  template<class Observable, class Count, class Duration,
194  class IsDuration = rxu::is_duration<Duration>,
195  class Enabled = rxu::enable_if_all_true_type_t<
196  is_observable<Observable>,
197  std::is_integral<Count>,
198  IsDuration>,
199  class SourceValue = rxu::value_type_t<Observable>,
201  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
203  >
204  static Result member(Observable&& o, Count count, Duration&& d, composite_subscription cs = composite_subscription()) {
205  return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), identity_current_thread(), cs)));
206  }
207 
208  template<class Observable, class Count, class Duration, class Coordination,
209  class IsDuration = rxu::is_duration<Duration>,
210  class Enabled = rxu::enable_if_all_true_type_t<
211  is_observable<Observable>,
212  std::is_integral<Count>,
213  IsDuration,
214  is_coordination<Coordination>>,
215  class SourceValue = rxu::value_type_t<Observable>,
217  class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
219  >
220  static Result member(Observable&& o, Count count, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) {
221  return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), std::forward<Coordination>(cn), cs)));
222  }
223 
224  template<class... AN>
225  static operators::detail::replay_invalid_t<AN...> member(AN...) {
226  std::terminate();
227  return {};
228  static_assert(sizeof...(AN) == 10000, "replay takes (optional Count, optional Duration, optional Coordination, optional CompositeSubscription)");
229  }
230 };
231 
232 }
233 
234 #endif
static Result member(Observable &&o, Duration &&d, Coordination &&cn, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:189
Definition: rx-util.hpp:791
a source of values that is shared across all subscribers and does not start until connectable_observa...
Definition: rx-connectable_observable.hpp:105
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
static Result member(Observable &&o, composite_subscription cs)
Definition: rx-replay.hpp:120
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
static Result member(Observable &&o, Count count, Duration &&d, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:204
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
static Result member(Observable &&o)
Definition: rx-replay.hpp:108
Definition: rx-operators.hpp:47
static Result member(Observable &&o, Duration &&d, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:174
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
static Result member(Observable &&o, Count count, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:146
Definition: rx-replaysubject.hpp:121
static Result member(Observable &&o, Count count, Duration &&d, Coordination &&cn, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:220
static Result member(Observable &&o, Count count, Coordination &&cn, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:160
static Result member(Observable &&o, Coordination &&cn, composite_subscription cs=composite_subscription())
Definition: rx-replay.hpp:133
Definition: rx-operators.hpp:332
auto replay(AN &&...an) -> operator_factory< replay_tag, AN... >
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-replay.hpp:90
static operators::detail::replay_invalid_t< AN... > member(AN...)
Definition: rx-replay.hpp:225
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
allows connections to the source to be independent of subscriptions.
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37