RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-window_toggle.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
28 #if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP)
29 #define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP
30 
31 #include "../rx-includes.hpp"
32 
33 namespace rxcpp {
34 
35 namespace operators {
36 
37 namespace detail {
38 
39 template<class... AN>
40 struct window_toggle_invalid_arguments {};
41 
42 template<class... AN>
43 struct window_toggle_invalid : public rxo::operator_base<window_toggle_invalid_arguments<AN...>> {
44  using type = observable<window_toggle_invalid_arguments<AN...>, window_toggle_invalid<AN...>>;
45 };
46 template<class... AN>
47 using window_toggle_invalid_t = typename window_toggle_invalid<AN...>::type;
48 
49 template<class T, class Openings, class ClosingSelector, class Coordination>
50 struct window_toggle
51 {
52  typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type;
53 
54  using source_value_type = rxu::decay_t<T>;
55  using coordination_type = rxu::decay_t<Coordination>;
56  using coordinator_type = typename coordination_type::coordinator_type;
57  using openings_type = rxu::decay_t<Openings>;
58  using openings_value_type = typename openings_type::value_type;
59  using closing_selector_type = rxu::decay_t<ClosingSelector>;
60  using closings_type = rxu::result_of_t<closing_selector_type(openings_value_type)>;
61  using closings_value_type = typename closings_type::value_type;
62 
63  struct window_toggle_values
64  {
65  window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c)
66  : openings(opens)
67  , closingSelector(closes)
68  , coordination(c)
69  {
70  }
71  openings_type openings;
72  mutable closing_selector_type closingSelector;
73  coordination_type coordination;
74  };
75  window_toggle_values initial;
76 
77  window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination)
78  : initial(opens, closes, coordination)
79  {
80  }
81 
82  template<class Subscriber>
83  struct window_toggle_observer
84  {
85  typedef window_toggle_observer<Subscriber> this_type;
86  typedef rxu::decay_t<T> value_type;
87  typedef rxu::decay_t<Subscriber> dest_type;
88  typedef observer<T, this_type> observer_type;
89 
90  struct window_toggle_subscriber_values : public window_toggle_values
91  {
92  window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
93  : window_toggle_values(v)
94  , cs(std::move(cs))
95  , dest(std::move(d))
96  , coordinator(std::move(c))
97  , worker(coordinator.get_worker())
98  {
99  }
100  composite_subscription cs;
101  dest_type dest;
102  coordinator_type coordinator;
103  rxsc::worker worker;
104  mutable std::list<rxcpp::subjects::subject<T>> subj;
105  };
106  std::shared_ptr<window_toggle_subscriber_values> state;
107 
108  window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
109  : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
110  {
111  auto localState = state;
112 
113  composite_subscription innercs;
114 
115  // when the out observer is unsubscribed all the
116  // inner subscriptions are unsubscribed as well
117  auto innerscope = localState->dest.add(innercs);
118 
119  innercs.add([=](){
120  localState->dest.remove(innerscope);
121  });
122 
123  localState->dest.add(localState->cs);
124 
125  auto source = on_exception(
126  [&](){return localState->coordinator.in(localState->openings);},
127  localState->dest);
128  if (source.empty()) {
129  return;
130  }
131 
132  // this subscribe does not share the observer subscription
133  // so that when it is unsubscribed the observer can be called
134  // until the inner subscriptions have finished
135  auto sink = make_subscriber<openings_value_type>(
136  localState->dest,
137  innercs,
138  // on_next
139  [localState](const openings_value_type& ov) {
140  auto closer = localState->closingSelector(ov);
141 
142  auto it = localState->subj.insert(localState->subj.end(), rxcpp::subjects::subject<T>());
143  localState->dest.on_next(it->get_observable().as_dynamic());
144 
145  composite_subscription innercs;
146 
147  // when the out observer is unsubscribed all the
148  // inner subscriptions are unsubscribed as well
149  auto innerscope = localState->dest.add(innercs);
150 
151  innercs.add([=](){
152  localState->dest.remove(innerscope);
153  });
154 
155  auto source = localState->coordinator.in(closer);
156 
157  auto sit = std::make_shared<decltype(it)>(it);
158  auto close = [localState, sit]() {
159  auto it = *sit;
160  *sit = localState->subj.end();
161  if (it != localState->subj.end()) {
162  it->get_subscriber().on_completed();
163  localState->subj.erase(it);
164  }
165  };
166 
167  // this subscribe does not share the observer subscription
168  // so that when it is unsubscribed the observer can be called
169  // until the inner subscriptions have finished
170  auto sink = make_subscriber<closings_value_type>(
171  localState->dest,
172  innercs,
173  // on_next
174  [close, innercs](closings_value_type) {
175  close();
176  innercs.unsubscribe();
177  },
178  // on_error
179  [localState](std::exception_ptr e) {
180  localState->dest.on_error(e);
181  },
182  // on_completed
183  close
184  );
185  auto selectedSink = localState->coordinator.out(sink);
186  source.subscribe(std::move(selectedSink));
187  },
188  // on_error
189  [localState](std::exception_ptr e) {
190  localState->dest.on_error(e);
191  },
192  // on_completed
193  []() {
194  }
195  );
196  auto selectedSink = on_exception(
197  [&](){return localState->coordinator.out(sink);},
198  localState->dest);
199  if (selectedSink.empty()) {
200  return;
201  }
202  source->subscribe(std::move(selectedSink.get()));
203  }
204 
205  void on_next(T v) const {
206  auto localState = state;
207  auto work = [v, localState](const rxsc::schedulable&){
208  for (auto s : localState->subj) {
209  s.get_subscriber().on_next(v);
210  }
211  };
212  auto selectedWork = on_exception(
213  [&](){return localState->coordinator.act(work);},
214  localState->dest);
215  if (selectedWork.empty()) {
216  return;
217  }
218  localState->worker.schedule(selectedWork.get());
219  }
220 
221  void on_error(std::exception_ptr e) const {
222  auto localState = state;
223  auto work = [e, localState](const rxsc::schedulable&){
224  for (auto s : localState->subj) {
225  s.get_subscriber().on_error(e);
226  }
227  localState->dest.on_error(e);
228  };
229  auto selectedWork = on_exception(
230  [&](){return localState->coordinator.act(work);},
231  localState->dest);
232  if (selectedWork.empty()) {
233  return;
234  }
235  localState->worker.schedule(selectedWork.get());
236  }
237 
238  void on_completed() const {
239  auto localState = state;
240  auto work = [localState](const rxsc::schedulable&){
241  for (auto s : localState->subj) {
242  s.get_subscriber().on_completed();
243  }
244  localState->dest.on_completed();
245  };
246  auto selectedWork = on_exception(
247  [&](){return localState->coordinator.act(work);},
248  localState->dest);
249  if (selectedWork.empty()) {
250  return;
251  }
252  localState->worker.schedule(selectedWork.get());
253  }
254 
255  static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) {
256  auto cs = composite_subscription();
257  auto coordinator = v.coordination.create_coordinator(d.get_subscription());
258 
259  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
260  }
261  };
262 
263  template<class Subscriber>
264  auto operator()(Subscriber dest) const
265  -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) {
266  return window_toggle_observer<Subscriber>::make(std::move(dest), initial);
267  }
268 };
269 
270 }
271 
274 template<class... AN>
275 auto window_toggle(AN&&... an)
277  return operator_factory<window_toggle_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
278 }
279 
280 }
281 
282 template<>
284 {
285  template<class Observable, class Openings, class ClosingSelector,
286  class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
287  class OpeningsType = rxu::decay_t<Openings>,
288  class OpeningsValueType = typename OpeningsType::value_type,
289  class Enabled = rxu::enable_if_all_true_type_t<
291  class SourceValue = rxu::value_type_t<Observable>,
292  class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, identity_one_worker>,
293  class Value = observable<SourceValue>>
294  static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector)
295  -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()))) {
296  return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()));
297  }
298 
299  template<class Observable, class Openings, class ClosingSelector, class Coordination,
300  class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
301  class OpeningsType = rxu::decay_t<Openings>,
302  class OpeningsValueType = typename OpeningsType::value_type,
303  class Enabled = rxu::enable_if_all_true_type_t<
304  all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>,
306  class SourceValue = rxu::value_type_t<Observable>,
307  class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, rxu::decay_t<Coordination>>,
308  class Value = observable<SourceValue>>
309  static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector, Coordination&& cn)
310  -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)))) {
311  return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)));
312  }
313 
314  template<class... AN>
315  static operators::detail::window_toggle_invalid_t<AN...> member(AN...) {
316  std::terminate();
317  return {};
318  static_assert(sizeof...(AN) == 10000, "window_toggle takes (Openings, ClosingSelector, optional Coordination)");
319  }
320 };
321 
322 }
323 
324 #endif
Definition: rx-util.hpp:100
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
Definition: rx-subject.hpp:237
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Openings &&openings, ClosingSelector &&closingSelector) -> decltype(o.template lift< Value >(WindowToggle(std::forward< Openings >(openings), std::forward< ClosingSelector >(closingSelector), identity_immediate())))
Definition: rx-window_toggle.hpp:294
auto window_toggle(AN &&...an) -> operator_factory< window_toggle_tag, AN... >
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-window_toggle.hpp:275
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
static auto member(Observable &&o, Openings &&openings, ClosingSelector &&closingSelector, Coordination &&cn) -> decltype(o.template lift< Value >(WindowToggle(std::forward< Openings >(openings), std::forward< ClosingSelector >(closingSelector), std::forward< Coordination >(cn))))
Definition: rx-window_toggle.hpp:309
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-operators.hpp:494
Definition: rx-coordination.hpp:114
static operators::detail::window_toggle_invalid_t< AN... > member(AN...)
Definition: rx-window_toggle.hpp:315
Definition: rx-coordination.hpp:37