5 #if !defined(RXCPP_RX_SCHEDULER_HPP) 6 #define RXCPP_RX_SCHEDULER_HPP 12 namespace schedulers {
20 typedef std::shared_ptr<action_type> action_ptr;
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
31 inline action_ptr shared_empty() {
32 static action_ptr shared_empty = std::make_shared<detail::action_type>();
66 mutable bool isrequested;
73 , requestor(isrequested)
97 mutable bool isallowed;
103 , recursor(isallowed)
108 , recursor(isallowed)
112 inline void reset(
bool b =
true)
const {
133 detail::action_ptr inner;
139 : inner(std::move(i))
145 return action(detail::shared_empty());
164 :
public std::enable_shared_from_this<worker_interface>
173 virtual clock_type::time_point now()
const = 0;
175 virtual void schedule(
const schedulable& scbl)
const = 0;
176 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const = 0;
182 struct is_action_function
186 static auto check(
int) -> decltype((*(CF*)
nullptr)(*(
schedulable*)
nullptr));
188 static not_void check(...);
190 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)),
void>::value;
203 detail::worker_interface_ptr inner;
216 , lifetime(std::move(cs))
221 , lifetime(std::move(cs))
238 return lifetime.
add(std::move(s));
240 inline void remove(weak_subscription w)
const {
241 return lifetime.
remove(std::move(w));
244 return lifetime.clear();
253 inline clock_type::time_point
now()
const {
260 schedule_rebind(scbl);
266 schedule_rebind(when, scbl);
275 schedule_rebind(now() + when, scbl);
282 schedule_periodically_rebind(initial, period, scbl);
289 schedule_periodically_rebind(now() + initial, period, scbl);
293 template<
class Arg0,
class... ArgN>
294 auto schedule(Arg0&& a0, ArgN&&... an)
const 295 ->
typename std::enable_if<
296 (detail::is_action_function<Arg0>::value ||
299 template<
class... ArgN>
301 void schedule_rebind(
const schedulable& scbl, ArgN&&... an)
const;
304 template<
class Arg0,
class... ArgN>
305 auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an)
const 306 ->
typename std::enable_if<
307 (detail::is_action_function<Arg0>::value ||
311 template<
class... ArgN>
312 void schedule_rebind(clock_type::time_point when,
const schedulable& scbl, ArgN&&... an)
const;
315 template<
class Arg0,
class... ArgN>
316 auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an)
const 317 ->
typename std::enable_if<
318 (detail::is_action_function<Arg0>::value ||
322 template<
class... ArgN>
323 void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period,
const schedulable& scbl, ArgN&&... an)
const;
327 return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
330 return !(lhs == rhs);
335 detail::worker_interface_weak_ptr inner;
344 , lifetime(owner.lifetime)
354 :
public std::enable_shared_from_this<scheduler_interface>
363 virtual clock_type::time_point now()
const = 0;
386 detail::scheduler_interface_ptr inner;
395 : inner(std::move(i))
398 explicit scheduler(detail::const_scheduler_interface_ptr i)
404 inline clock_type::time_point
now()
const {
413 return inner->create_worker(cs);
417 template<
class Scheduler,
class... ArgN>
419 return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
444 detacher(
const this_type* that)
448 const this_type* that;
451 class recursed_scope_type
455 class exit_recursed_scope_type
457 const recursed_scope_type* that;
459 ~exit_recursed_scope_type()
461 that->requestor =
nullptr;
463 exit_recursed_scope_type(
const recursed_scope_type* that)
469 recursed_scope_type()
473 recursed_scope_type(
const recursed_scope_type&)
478 recursed_scope_type& operator=(
const recursed_scope_type& )
483 exit_recursed_scope_type reset(
const recurse& r)
const {
485 return exit_recursed_scope_type(
this);
487 bool is_recursed()
const {
490 void operator()()
const {
494 recursed_scope_type recursed_scope;
513 : lifetime(q.get_subscription())
515 , activity(std::move(a))
521 : lifetime(std::move(cs))
523 , activity(std::move(a))
525 , action_scope(controller.lock().add(lifetime))
530 : lifetime(scbl.get_subscription())
532 , activity(std::move(a))
533 , scoped(scbl.scoped)
534 , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
545 return controller.
lock();
548 return controller.
lock();
562 -> decltype(recursed_scope.reset(r)) {
563 return recursed_scope.reset(r);
569 return recursed_scope.is_recursed();
589 return lifetime.
add(std::move(s));
593 ->
typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
596 inline void remove(weak_subscription w)
const {
597 return lifetime.
remove(std::move(w));
600 return lifetime.clear();
608 inline clock_type::time_point
now()
const {
609 return controller.
lock().
now();
613 if (is_subscribed()) {
614 get_worker().schedule(*
this);
618 inline void schedule(clock_type::time_point when)
const {
619 if (is_subscribed()) {
620 get_worker().schedule(when, *
this);
624 inline void schedule(clock_type::duration when)
const {
625 if (is_subscribed()) {
626 get_worker().schedule(when, *
this);
634 if (!is_subscribed()) {
637 detacher protect(
this);
639 protect.that =
nullptr;
648 :
public std::enable_shared_from_this<action_type>
650 typedef action_type this_type;
653 typedef std::function<void(const schedulable&, const recurse&)> function_type;
663 action_type(function_type f)
676 class action_tailrecurser
677 :
public std::enable_shared_from_this<action_type>
679 typedef action_type this_type;
682 typedef std::function<void(const schedulable&)> function_type;
688 action_tailrecurser()
692 action_tailrecurser(function_type f)
729 static_assert(detail::is_action_function<F>::value,
"action function must be void(schedulable)");
730 auto fn = std::forward<F>(f);
731 return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
756 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
761 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
766 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
771 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
776 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
782 return schedulable(cs, scbl.get_worker(), scbl.get_action());
793 template<
class Arg0,
class... ArgN>
795 -> typename std::enable_if<
796 (detail::is_action_function<Arg0>::value ||
799 auto scbl =
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
801 inner->schedule(std::move(scbl));
804 template<
class... ArgN>
808 inner->schedule(std::move(rescbl));
812 template<
class Arg0,
class... ArgN>
814 -> typename std::enable_if<
815 (detail::is_action_function<Arg0>::value ||
818 auto scbl =
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
820 inner->schedule(when, std::move(scbl));
823 template<
class... ArgN>
827 inner->schedule(when, std::move(rescbl));
831 template<
class Arg0,
class... ArgN>
833 -> typename std::enable_if<
834 (detail::is_action_function<Arg0>::value ||
837 schedule_periodically_rebind(initial, period,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
839 template<
class... ArgN>
841 auto keepAlive = *
this;
842 auto target = std::make_shared<clock_type::time_point>(initial);
843 auto activity =
make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
846 [keepAlive, target, period, activity](
schedulable self) {
854 self.schedule(*target);
856 trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
857 inner->schedule(*target, periodic);
863 template<
class TimePo
int>
864 struct time_schedulable
866 typedef TimePoint time_point_type;
881 template<
class TimePo
int>
882 class schedulable_queue {
884 typedef time_schedulable<TimePoint> item_type;
885 typedef std::pair<item_type, int64_t> elem_type;
886 typedef std::vector<elem_type> container_type;
887 typedef const item_type& const_reference;
892 bool operator()(
const elem_type& lhs,
const elem_type& rhs)
const {
893 if (lhs.first.when == rhs.first.when) {
894 return lhs.second > rhs.second;
897 return lhs.first.when > rhs.first.when;
902 typedef std::priority_queue<
918 const_reference top()
const {
919 return q.top().first;
930 void push(
const item_type& value) {
931 q.push(elem_type(value, ordinal++));
934 void push(item_type&& value) {
935 q.push(elem_type(std::move(value), ordinal++));
942 namespace rxsc=schedulers;
tag_action action_tag
Definition: rx-scheduler.hpp:124
void schedule(clock_type::time_point when, const schedulable &scbl) const
insert the supplied schedulable to be run at the time specified
Definition: rx-scheduler.hpp:264
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:209
Definition: rx-scheduler.hpp:163
void unsubscribe() const
Definition: rx-scheduler.hpp:246
const recursed & get_recursed() const
get the recursed to set into the schedulable for the function to use to request recursion ...
Definition: rx-scheduler.hpp:89
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:181
void reset(bool b=true) const
set whether tail-recursion is allowed
Definition: rx-scheduler.hpp:112
void schedule() const
put this on the queue of the stored scheduler to run asap
Definition: rx-scheduler.hpp:612
void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:287
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:538
virtual ~scheduler_interface()
Definition: rx-scheduler.hpp:361
Definition: rx-all.hpp:26
bool is_allowed() const
does the scheduler allow tail-recursion now?
Definition: rx-scheduler.hpp:77
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
worker(composite_subscription cs, detail::const_worker_interface_ptr i)
Definition: rx-scheduler.hpp:214
schedulable(composite_subscription cs, worker q, action a)
action and worker have independent lifetimes
Definition: rx-scheduler.hpp:520
tag_scheduler scheduler_tag
Definition: rx-scheduler.hpp:155
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-scheduler.hpp:152
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
action(detail::action_ptr i)
Definition: rx-scheduler.hpp:138
bool is_recursed() const
Definition: rx-scheduler.hpp:568
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:432
Definition: rx-scheduler.hpp:158
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:498
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
action make_action(F &&f)
Definition: rx-scheduler.hpp:728
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
void operator()(const recurse &r) const
invokes the action
Definition: rx-scheduler.hpp:633
bool operator!=(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:329
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:228
Definition: rx-scheduler.hpp:46
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:237
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:497
Definition: rx-currentthread.hpp:131
virtual ~worker_interface()
Definition: rx-scheduler.hpp:171
worker(composite_subscription cs, worker o)
Definition: rx-scheduler.hpp:219
void schedule(clock_type::time_point when) const
put this on the queue of the stored scheduler to run at the specified time
Definition: rx-scheduler.hpp:618
void operator()() const
Definition: rx-scheduler.hpp:579
clock_type::time_point now() const
Definition: rx-scheduler.hpp:608
action & get_action()
Definition: rx-scheduler.hpp:553
weak_worker()
Definition: rx-scheduler.hpp:339
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:225
Definition: rx-subscription.hpp:31
void operator()(const schedulable &s, const recurse &r) const
call the function
Definition: rx-scheduler.hpp:719
action provides type-forgetting for a potentially recursive set of calls to a function that takes a s...
Definition: rx-scheduler.hpp:130
Definition: rx-predef.hpp:21
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
void unsubscribe() const
Definition: rx-scheduler.hpp:602
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
Definition: rx-predef.hpp:30
Definition: rx-scheduler.hpp:122
void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:280
static action empty()
return the empty action
Definition: rx-scheduler.hpp:144
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:240
void clear() const
Definition: rx-scheduler.hpp:599
Definition: rx-predef.hpp:58
static composite_subscription empty()
Definition: rx-subscription.hpp:404
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:208
scheduler(detail::scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:394
scheduler()
Definition: rx-scheduler.hpp:391
weak_worker(worker &owner)
Definition: rx-scheduler.hpp:342
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
bool is_requested() const
did the function request to be recursed?
Definition: rx-scheduler.hpp:81
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
clock_type::time_point now() const
return the current time for this worker
Definition: rx-scheduler.hpp:253
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
std::chrono::steady_clock clock_type
Definition: rx-scheduler.hpp:154
void clear() const
Definition: rx-scheduler.hpp:243
action make_action_empty()
Definition: rx-scheduler.hpp:723
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
scheduler make_scheduler(ArgN &&...an)
Definition: rx-scheduler.hpp:418
recurse(bool &a)
Definition: rx-scheduler.hpp:70
void unsubscribe() const
Definition: rx-subscription.hpp:170
schedulable()
Definition: rx-scheduler.hpp:506
recursion()
Definition: rx-scheduler.hpp:101
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
tag_worker worker_tag
Definition: rx-scheduler.hpp:160
const recurse & get_recurse() const
get the recurse to pass into each action being called
Definition: rx-scheduler.hpp:116
Definition: rx-scheduler.hpp:369
void operator()() const
request to be rescheduled
Definition: rx-scheduler.hpp:56
Definition: rx-scheduler.hpp:63
auto add(F f) const -> typename std::enable_if< rxcpp::detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-scheduler.hpp:592
bool operator==(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:326
Definition: rx-scheduler.hpp:353
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:588
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:389
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:541
Definition: rx-predef.hpp:56
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
Definition: rx-scheduler.hpp:333
Definition: rx-subscription.hpp:29
void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl, ArgN &&...an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:840
const worker get_worker() const
Definition: rx-scheduler.hpp:544
scheduler(detail::const_scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:398
recursed(bool &r)
Definition: rx-scheduler.hpp:51
worker get_worker()
Definition: rx-scheduler.hpp:547
bool is_subscribed() const
Definition: rx-scheduler.hpp:234
Definition: rx-subscription.hpp:67
void reset() const
reset the function request. call before each call to the function.
Definition: rx-scheduler.hpp:85
static schedulable empty(worker sc)
Definition: rx-scheduler.hpp:557
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
schedulable(worker q, action a)
action and worker share lifetime
Definition: rx-scheduler.hpp:512
void schedule(clock_type::duration when, const schedulable &scbl) const
insert the supplied schedulable to be run at now() + the delay specified
Definition: rx-scheduler.hpp:273
worker()
Definition: rx-scheduler.hpp:211
Definition: rx-scheduler.hpp:426
bool is_subscribed() const
Definition: rx-subscription.hpp:164
recursion(bool b)
Definition: rx-scheduler.hpp:106
const action & get_action() const
Definition: rx-scheduler.hpp:550
action()
Definition: rx-scheduler.hpp:135
Definition: rx-predef.hpp:43
auto set_recursed(const recurse &r) const -> decltype(recursed_scope.reset(r))
Definition: rx-scheduler.hpp:561
~schedulable()
Definition: rx-scheduler.hpp:500
void schedule(clock_type::duration when) const
put this on the queue of the stored scheduler to run after a delay from now
Definition: rx-scheduler.hpp:624
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:169
worker lock() const
Definition: rx-scheduler.hpp:348
void schedule_rebind(const schedulable &scbl, ArgN &&...an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:805
schedulable(schedulable scbl, worker q, action a)
inherit lifetimes
Definition: rx-scheduler.hpp:529
tag_schedulable schedulable_tag
Definition: rx-scheduler.hpp:374
Definition: rx-scheduler.hpp:200