RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-subscribe_on.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
23 #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
25 
26 #include "../rx-includes.hpp"
27 
28 namespace rxcpp {
29 
30 namespace operators {
31 
32 namespace detail {
33 
34 template<class... AN>
35 struct subscribe_on_invalid_arguments {};
36 
37 template<class... AN>
38 struct subscribe_on_invalid : public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> {
39  using type = observable<subscribe_on_invalid_arguments<AN...>, subscribe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using subscribe_on_invalid_t = typename subscribe_on_invalid<AN...>::type;
43 
44 template<class T, class Observable, class Coordination>
45 struct subscribe_on : public operator_base<T>
46 {
47  typedef rxu::decay_t<Observable> source_type;
48  typedef rxu::decay_t<Coordination> coordination_type;
49  typedef typename coordination_type::coordinator_type coordinator_type;
50  struct subscribe_on_values
51  {
52  ~subscribe_on_values()
53  {
54  }
55  subscribe_on_values(source_type s, coordination_type sf)
56  : source(std::move(s))
57  , coordination(std::move(sf))
58  {
59  }
60  source_type source;
61  coordination_type coordination;
62  private:
63  subscribe_on_values& operator=(subscribe_on_values o) RXCPP_DELETE;
64  };
65  const subscribe_on_values initial;
66 
67  ~subscribe_on()
68  {
69  }
70  subscribe_on(source_type s, coordination_type sf)
71  : initial(std::move(s), std::move(sf))
72  {
73  }
74 
75  template<class Subscriber>
76  void on_subscribe(Subscriber s) const {
77 
78  typedef Subscriber output_type;
79  struct subscribe_on_state_type
80  : public std::enable_shared_from_this<subscribe_on_state_type>
81  , public subscribe_on_values
82  {
83  subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg)
84  : subscribe_on_values(i)
85  , out(oarg)
86  {
87  }
88  composite_subscription source_lifetime;
89  output_type out;
90  private:
91  subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
92  };
93 
94  composite_subscription coordinator_lifetime;
95 
96  auto coordinator = initial.coordination.create_coordinator(coordinator_lifetime);
97 
98  auto controller = coordinator.get_worker();
99 
100  // take a copy of the values for each subscription
101  auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s));
102 
103  auto sl = state->source_lifetime;
104  auto ol = state->out.get_subscription();
105 
106  auto disposer = [=](const rxsc::schedulable&){
107  sl.unsubscribe();
108  ol.unsubscribe();
109  coordinator_lifetime.unsubscribe();
110  };
111  auto selectedDisposer = on_exception(
112  [&](){return coordinator.act(disposer);},
113  state->out);
114  if (selectedDisposer.empty()) {
115  return;
116  }
117 
118  state->source_lifetime.add([=](){
119  controller.schedule(selectedDisposer.get());
120  });
121 
122  state->out.add([=](){
123  sl.unsubscribe();
124  ol.unsubscribe();
125  coordinator_lifetime.unsubscribe();
126  });
127 
128  auto producer = [=](const rxsc::schedulable&){
129  state->source.subscribe(state->source_lifetime, state->out);
130  };
131 
132  auto selectedProducer = on_exception(
133  [&](){return coordinator.act(producer);},
134  state->out);
135  if (selectedProducer.empty()) {
136  return;
137  }
138 
139  controller.schedule(selectedProducer.get());
140  }
141 private:
142  subscribe_on& operator=(subscribe_on o) RXCPP_DELETE;
143 };
144 
145 }
146 
149 template<class... AN>
150 auto subscribe_on(AN&&... an)
152  return operator_factory<subscribe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
153 }
154 
155 }
156 
157 template<>
159 {
160  template<class Observable, class Coordination,
161  class Enabled = rxu::enable_if_all_true_type_t<
164  class SourceValue = rxu::value_type_t<Observable>,
165  class SubscribeOn = rxo::detail::subscribe_on<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
166  class Value = rxu::value_type_t<SubscribeOn>,
167  class Result = observable<Value, SubscribeOn>>
168  static Result member(Observable&& o, Coordination&& cn) {
169  return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn)));
170  }
171 
172  template<class... AN>
173  static operators::detail::subscribe_on_invalid_t<AN...> member(AN...) {
174  std::terminate();
175  return {};
176  static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)");
177  }
178 };
179 
180 }
181 
182 #endif
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
auto subscribe_on(AN &&...an) -> operator_factory< subscribe_on_tag, AN... >
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-subscribe_on.hpp:150
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-subscribe_on.hpp:168
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
static operators::detail::subscribe_on_invalid_t< AN... > member(AN...)
Definition: rx-subscribe_on.hpp:173
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-operators.hpp:395