5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP) 6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
23 typedef loop_worker this_type;
24 loop_worker(
const this_type&);
26 typedef detail::schedulable_queue<
27 typename clock_type::time_point> queue_item_time;
29 typedef queue_item_time::item_type item_type;
35 virtual ~loop_worker()
44 virtual clock_type::time_point
now()
const {
45 return clock_type::now();
48 virtual void schedule(
const schedulable& scbl)
const {
52 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
59 mutable std::atomic<std::size_t> count;
60 std::vector<worker> loops;
64 : factory([](std::function<void()> start){
65 return std::thread(std::move(start));
70 auto remaining =
std::max(std::thread::hardware_concurrency(),
unsigned(4));
80 auto remaining =
std::max(std::thread::hardware_concurrency(),
unsigned(4));
89 virtual clock_type::time_point
now()
const {
90 return clock_type::now();
94 return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()]));
99 static scheduler instance = make_scheduler<event_loop>();
103 return make_scheduler<event_loop>(tf);
Definition: rx-scheduler.hpp:163
virtual worker create_worker(composite_subscription cs) const
Definition: rx-eventloop.hpp:93
Definition: rx-all.hpp:26
virtual ~event_loop()
Definition: rx-eventloop.hpp:85
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
Definition: rx-eventloop.hpp:14
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
Definition: rx-scheduler.hpp:353
event_loop()
Definition: rx-eventloop.hpp:63
event_loop(thread_factory tf)
Definition: rx-eventloop.hpp:75
Definition: rx-scheduler.hpp:426
const action & get_action() const
Definition: rx-scheduler.hpp:550
virtual clock_type::time_point now() const
Definition: rx-eventloop.hpp:89
Definition: rx-scheduler.hpp:200