5 #if !defined(RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP) 6 #define RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
18 typedef action_queue this_type;
21 typedef time_schedulable<clock::time_point> item_type;
24 typedef schedulable_queue<item_type::time_point_type> queue_item_time;
27 struct current_thread_queue_type {
28 std::shared_ptr<worker_interface> w;
34 #if defined(RXCPP_THREAD_LOCAL) 35 static current_thread_queue_type*& current_thread_queue() {
36 static RXCPP_THREAD_LOCAL current_thread_queue_type* q;
40 static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() {
41 static rxu::thread_local_storage<current_thread_queue_type> q;
49 return !!current_thread_queue();
51 static const std::shared_ptr<worker_interface>& get_worker_interface() {
52 return current_thread_queue()->w;
54 static recursion& get_recursion() {
55 return current_thread_queue()->r;
58 if (!current_thread_queue()) {
61 return current_thread_queue()->q.empty();
63 static queue_item_time::const_reference top() {
64 if (!current_thread_queue()) {
67 return current_thread_queue()->q.top();
70 auto& state = current_thread_queue();
75 if (state->q.empty()) {
80 static void push(item_type item) {
81 auto& state = current_thread_queue();
85 if (!item.what.is_subscribed()) {
88 state->q.push(std::move(item));
90 state->r.reset(
false);
92 static std::shared_ptr<worker_interface> ensure(std::shared_ptr<worker_interface> w) {
93 if (!!current_thread_queue()) {
97 current_thread_queue() =
new current_thread_queue_type();
98 current_thread_queue()->w = w;
101 static std::unique_ptr<current_thread_queue_type>
create(std::shared_ptr<worker_interface> w) {
102 std::unique_ptr<current_thread_queue_type> result(
new current_thread_queue_type());
103 result->w = std::move(w);
106 static void set(current_thread_queue_type* q) {
107 if (!!current_thread_queue()) {
111 current_thread_queue() = q;
113 static void destroy(current_thread_queue_type* q) {
116 static void destroy() {
117 if (!current_thread_queue()) {
120 #if defined(RXCPP_THREAD_LOCAL) 121 destroy(current_thread_queue());
123 destroy(current_thread_queue().
get());
125 current_thread_queue() =
nullptr;
137 typedef detail::action_queue queue_type;
143 derecurser(
const this_type&);
148 virtual ~derecurser()
152 virtual clock_type::time_point now()
const {
153 return clock_type::now();
156 virtual void schedule(
const schedulable& scbl)
const {
157 queue_type::push(queue_type::item_type(now(), scbl));
160 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
161 queue_type::push(queue_type::item_type(when, scbl));
169 current_worker(
const this_type&);
174 virtual ~current_worker()
178 virtual clock_type::time_point now()
const {
179 return clock_type::now();
182 virtual void schedule(
const schedulable& scbl)
const {
183 schedule(now(), scbl);
186 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
193 if (queue_type::owned()) {
195 queue_type::get_worker_interface()->schedule(when, scbl);
200 queue_type::ensure(std::make_shared<derecurser>());
204 queue_type::destroy();
207 const auto& recursor = queue_type::get_recursion().get_recurse();
208 std::this_thread::sleep_until(when);
218 auto next = queue_type::top().when;
219 (std::this_thread::sleep_until(next),
true);
220 next = queue_type::top().when
222 auto what = queue_type::top().what;
226 if (
what.is_subscribed()) {
237 std::shared_ptr<current_worker> wi;
241 : wi(std::make_shared<current_worker>())
254 virtual clock_type::time_point
now()
const {
255 return clock_type::now();
259 return worker(std::move(cs), wi);
264 static scheduler instance = make_scheduler<current_thread>();
Definition: rx-scheduler.hpp:163
auto create(OnSubscribe os) -> observable< T, detail::create< T, OnSubscribe >>
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-create.hpp:82
Definition: rx-all.hpp:26
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
bool is_tail_recursion_allowed() const
Definition: rx-currentthread.hpp:250
virtual ~current_thread()
Definition: rx-currentthread.hpp:244
Definition: rx-currentthread.hpp:131
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual clock_type::time_point now() const
Definition: rx-currentthread.hpp:254
virtual worker create_worker(composite_subscription cs) const
Definition: rx-currentthread.hpp:258
std::chrono::steady_clock clock_type
Definition: rx-scheduler.hpp:154
static bool is_schedule_required()
Definition: rx-currentthread.hpp:248
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
current_thread()
Definition: rx-currentthread.hpp:240
Definition: rx-scheduler.hpp:353
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200