28 #if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP) 29 #define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP 31 #include "../rx-includes.hpp" 40 struct window_toggle_invalid_arguments {};
43 struct window_toggle_invalid :
public rxo::operator_base<window_toggle_invalid_arguments<AN...>> {
44 using type = observable<window_toggle_invalid_arguments<
AN...>, window_toggle_invalid<
AN...>>;
47 using window_toggle_invalid_t =
typename window_toggle_invalid<
AN...>::type;
49 template<
class T,
class Openings,
class ClosingSelector,
class Coordination>
52 typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type;
54 using source_value_type = rxu::decay_t<T>;
55 using coordination_type = rxu::decay_t<Coordination>;
56 using coordinator_type =
typename coordination_type::coordinator_type;
57 using openings_type = rxu::decay_t<Openings>;
58 using openings_value_type =
typename openings_type::value_type;
59 using closing_selector_type = rxu::decay_t<ClosingSelector>;
60 using closings_type = rxu::result_of_t<closing_selector_type(openings_value_type)>;
61 using closings_value_type =
typename closings_type::value_type;
63 struct window_toggle_values
65 window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c)
67 , closingSelector(closes)
71 openings_type openings;
72 mutable closing_selector_type closingSelector;
73 coordination_type coordination;
75 window_toggle_values initial;
77 window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination)
78 : initial(opens, closes, coordination)
82 template<
class Subscriber>
83 struct window_toggle_observer
85 typedef window_toggle_observer<Subscriber> this_type;
86 typedef rxu::decay_t<T> value_type;
87 typedef rxu::decay_t<Subscriber> dest_type;
88 typedef observer<T, this_type> observer_type;
90 struct window_toggle_subscriber_values :
public window_toggle_values
92 window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
93 : window_toggle_values(v)
96 , coordinator(std::move(c))
97 , worker(coordinator.get_worker())
100 composite_subscription cs;
102 coordinator_type coordinator;
104 mutable std::list<rxcpp::subjects::subject<T>> subj;
106 std::shared_ptr<window_toggle_subscriber_values> state;
108 window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
109 : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
111 auto localState = state;
113 composite_subscription innercs;
117 auto innerscope = localState->dest.add(innercs);
120 localState->dest.remove(innerscope);
123 localState->dest.add(localState->cs);
126 [&](){
return localState->coordinator.in(localState->openings);},
128 if (source.empty()) {
135 auto sink = make_subscriber<openings_value_type>(
139 [localState](
const openings_value_type& ov) {
140 auto closer = localState->closingSelector(ov);
143 localState->dest.on_next(it->get_observable().as_dynamic());
145 composite_subscription innercs;
149 auto innerscope = localState->dest.add(innercs);
152 localState->dest.remove(innerscope);
155 auto source = localState->coordinator.in(closer);
157 auto sit = std::make_shared<decltype(it)>(it);
158 auto close = [localState, sit]() {
160 *sit = localState->subj.end();
161 if (it != localState->subj.end()) {
162 it->get_subscriber().on_completed();
163 localState->subj.erase(it);
170 auto sink = make_subscriber<closings_value_type>(
174 [close, innercs](closings_value_type) {
176 innercs.unsubscribe();
179 [localState](std::exception_ptr e) {
180 localState->dest.on_error(e);
185 auto selectedSink = localState->coordinator.out(sink);
186 source.subscribe(std::move(selectedSink));
189 [localState](std::exception_ptr e) {
190 localState->dest.on_error(e);
197 [&](){
return localState->coordinator.out(sink);},
199 if (selectedSink.empty()) {
202 source->subscribe(std::move(selectedSink.get()));
205 void on_next(T v)
const {
206 auto localState = state;
207 auto work = [v, localState](
const rxsc::schedulable&){
208 for (
auto s : localState->subj) {
209 s.get_subscriber().on_next(v);
213 [&](){
return localState->coordinator.act(work);},
215 if (selectedWork.empty()) {
218 localState->worker.schedule(selectedWork.get());
221 void on_error(std::exception_ptr e)
const {
222 auto localState = state;
223 auto work = [e, localState](
const rxsc::schedulable&){
224 for (
auto s : localState->subj) {
225 s.get_subscriber().on_error(e);
227 localState->dest.on_error(e);
230 [&](){
return localState->coordinator.act(work);},
232 if (selectedWork.empty()) {
235 localState->worker.schedule(selectedWork.get());
238 void on_completed()
const {
239 auto localState = state;
240 auto work = [localState](
const rxsc::schedulable&){
241 for (
auto s : localState->subj) {
242 s.get_subscriber().on_completed();
244 localState->dest.on_completed();
247 [&](){
return localState->coordinator.act(work);},
249 if (selectedWork.empty()) {
252 localState->worker.schedule(selectedWork.get());
255 static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) {
256 auto cs = composite_subscription();
257 auto coordinator = v.coordination.create_coordinator(d.get_subscription());
259 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
263 template<
class Subscriber>
264 auto operator()(Subscriber dest)
const 265 -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) {
266 return window_toggle_observer<Subscriber>::make(std::move(dest), initial);
274 template<
class...
AN>
277 return operator_factory<window_toggle_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
285 template<
class Observable,
class Openings,
class ClosingSelector,
288 class OpeningsValueType =
typename OpeningsType::value_type,
294 static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector)
295 -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector),
identity_immediate()))) {
296 return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector),
identity_immediate()));
299 template<
class Observable,
class Openings,
class ClosingSelector,
class Coordination,
300 class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
302 class OpeningsValueType =
typename OpeningsType::value_type,
304 all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>,
307 class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>,
rxu::decay_t<Coordination>>,
309 static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector, Coordination&& cn)
310 -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)))) {
311 return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)));
314 template<
class...
AN>
315 static operators::detail::window_toggle_invalid_t<
AN...>
member(
AN...) {
318 static_assert(
sizeof...(
AN) == 10000,
"window_toggle takes (Openings, ClosingSelector, optional Coordination)");
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Openings &&openings, ClosingSelector &&closingSelector) -> decltype(o.template lift< Value >(WindowToggle(std::forward< Openings >(openings), std::forward< ClosingSelector >(closingSelector), identity_immediate())))
Definition: rx-window_toggle.hpp:294
auto window_toggle(AN &&...an) -> operator_factory< window_toggle_tag, AN... >
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-window_toggle.hpp:275
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
static auto member(Observable &&o, Openings &&openings, ClosingSelector &&closingSelector, Coordination &&cn) -> decltype(o.template lift< Value >(WindowToggle(std::forward< Openings >(openings), std::forward< ClosingSelector >(closingSelector), std::forward< Coordination >(cn))))
Definition: rx-window_toggle.hpp:309
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-operators.hpp:494
Definition: rx-coordination.hpp:114
static operators::detail::window_toggle_invalid_t< AN... > member(AN...)
Definition: rx-window_toggle.hpp:315
Definition: rx-coordination.hpp:37