RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-switch_on_next.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
20 #if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP)
21 #define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct switch_on_next_invalid_arguments {};
33 
34 template<class... AN>
35 struct switch_on_next_invalid : public rxo::operator_base<switch_on_next_invalid_arguments<AN...>> {
36  using type = observable<switch_on_next_invalid_arguments<AN...>, switch_on_next_invalid<AN...>>;
37 };
38 template<class... AN>
39 using switch_on_next_invalid_t = typename switch_on_next_invalid<AN...>::type;
40 
41 template<class T, class Observable, class Coordination>
42 struct switch_on_next
43  : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
44 {
45  //static_assert(is_observable<Observable>::value, "switch_on_next requires an observable");
46  //static_assert(is_observable<T>::value, "switch_on_next requires an observable that contains observables");
47 
48  typedef switch_on_next<T, Observable, Coordination> this_type;
49 
50  typedef rxu::decay_t<T> source_value_type;
51  typedef rxu::decay_t<Observable> source_type;
52 
53  typedef typename source_type::source_operator_type source_operator_type;
54 
55  typedef source_value_type collection_type;
56  typedef typename collection_type::value_type collection_value_type;
57 
58  typedef rxu::decay_t<Coordination> coordination_type;
59  typedef typename coordination_type::coordinator_type coordinator_type;
60 
61  struct values
62  {
63  values(source_operator_type o, coordination_type sf)
64  : source_operator(std::move(o))
65  , coordination(std::move(sf))
66  {
67  }
68  source_operator_type source_operator;
69  coordination_type coordination;
70  };
71  values initial;
72 
73  switch_on_next(const source_type& o, coordination_type sf)
74  : initial(o.source_operator, std::move(sf))
75  {
76  }
77 
78  template<class Subscriber>
79  void on_subscribe(Subscriber scbr) const {
80  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
81 
82  typedef Subscriber output_type;
83 
84  struct switch_state_type
85  : public std::enable_shared_from_this<switch_state_type>
86  , public values
87  {
88  switch_state_type(values i, coordinator_type coor, output_type oarg)
89  : values(i)
90  , source(i.source_operator)
91  , pendingCompletions(0)
92  , coordinator(std::move(coor))
93  , out(std::move(oarg))
94  {
95  }
96  observable<source_value_type, source_operator_type> source;
97  // on_completed on the output must wait until all the
98  // subscriptions have received on_completed
99  int pendingCompletions;
100  coordinator_type coordinator;
101  composite_subscription inner_lifetime;
102  output_type out;
103  };
104 
105  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
106 
107  // take a copy of the values for each subscription
108  auto state = std::make_shared<switch_state_type>(initial, std::move(coordinator), std::move(scbr));
109 
110  composite_subscription outercs;
111 
112  // when the out observer is unsubscribed all the
113  // inner subscriptions are unsubscribed as well
114  state->out.add(outercs);
115 
116  auto source = on_exception(
117  [&](){return state->coordinator.in(state->source);},
118  state->out);
119  if (source.empty()) {
120  return;
121  }
122 
123  ++state->pendingCompletions;
124  // this subscribe does not share the observer subscription
125  // so that when it is unsubscribed the observer can be called
126  // until the inner subscriptions have finished
127  auto sink = make_subscriber<collection_type>(
128  state->out,
129  outercs,
130  // on_next
131  [state](collection_type st) {
132 
133  state->inner_lifetime.unsubscribe();
134 
135  state->inner_lifetime = composite_subscription();
136 
137  // when the out observer is unsubscribed all the
138  // inner subscriptions are unsubscribed as well
139  auto innerlifetimetoken = state->out.add(state->inner_lifetime);
140 
141  state->inner_lifetime.add(make_subscription([state, innerlifetimetoken](){
142  state->out.remove(innerlifetimetoken);
143  --state->pendingCompletions;
144  }));
145 
146  auto selectedSource = state->coordinator.in(st);
147 
148  // this subscribe does not share the source subscription
149  // so that when it is unsubscribed the source will continue
150  auto sinkInner = make_subscriber<collection_value_type>(
151  state->out,
152  state->inner_lifetime,
153  // on_next
154  [state, st](collection_value_type ct) {
155  state->out.on_next(std::move(ct));
156  },
157  // on_error
158  [state](std::exception_ptr e) {
159  state->out.on_error(e);
160  },
161  //on_completed
162  [state](){
163  if (state->pendingCompletions == 1) {
164  state->out.on_completed();
165  }
166  }
167  );
168 
169  auto selectedSinkInner = state->coordinator.out(sinkInner);
170  ++state->pendingCompletions;
171  selectedSource.subscribe(std::move(selectedSinkInner));
172  },
173  // on_error
174  [state](std::exception_ptr e) {
175  state->out.on_error(e);
176  },
177  // on_completed
178  [state]() {
179  if (--state->pendingCompletions == 0) {
180  state->out.on_completed();
181  }
182  }
183  );
184 
185  auto selectedSink = on_exception(
186  [&](){return state->coordinator.out(sink);},
187  state->out);
188  if (selectedSink.empty()) {
189  return;
190  }
191 
192  source->subscribe(std::move(selectedSink.get()));
193 
194  }
195 };
196 
197 }
198 
201 template<class... AN>
202 auto switch_on_next(AN&&... an)
204  return operator_factory<switch_on_next_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
205 }
206 
207 }
208 
209 template<>
211 {
212  template<class Observable,
213  class Enabled = rxu::enable_if_all_true_type_t<
215  class SourceValue = rxu::value_type_t<Observable>,
216  class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
217  class Value = rxu::value_type_t<SourceValue>,
218  class Result = observable<Value, SwitchOnNext>
219  >
220  static Result member(Observable&& o) {
221  return Result(SwitchOnNext(std::forward<Observable>(o), identity_current_thread()));
222  }
223 
224  template<class Observable, class Coordination,
225  class Enabled = rxu::enable_if_all_true_type_t<
226  is_observable<Observable>,
228  class SourceValue = rxu::value_type_t<Observable>,
229  class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
230  class Value = rxu::value_type_t<SourceValue>,
231  class Result = observable<Value, SwitchOnNext>
232  >
233  static Result member(Observable&& o, Coordination&& cn) {
234  return Result(SwitchOnNext(std::forward<Observable>(o), std::forward<Coordination>(cn)));
235  }
236 
237  template<class... AN>
238  static operators::detail::switch_on_next_invalid_t<AN...> member(AN...) {
239  std::terminate();
240  return {};
241  static_assert(sizeof...(AN) == 10000, "switch_on_next takes (optional Coordination)");
242  }
243 };
244 
245 }
246 
247 #endif
static operators::detail::switch_on_next_invalid_t< AN... > member(AN...)
Definition: rx-switch_on_next.hpp:238
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-switch_on_next.hpp:233
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static const bool value
Definition: rx-predef.hpp:123
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
auto switch_on_next(AN &&...an) -> operator_factory< switch_on_next_tag, AN... >
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-switch_on_next.hpp:202
static Result member(Observable &&o)
Definition: rx-switch_on_next.hpp:220
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-coordination.hpp:114
Definition: rx-operators.hpp:410
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37