RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-eventloop.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
15 {
16 private:
17  typedef event_loop this_type;
18  event_loop(const this_type&);
19 
20  struct loop_worker : public worker_interface
21  {
22  private:
23  typedef loop_worker this_type;
24  loop_worker(const this_type&);
25 
26  typedef detail::schedulable_queue<
27  typename clock_type::time_point> queue_item_time;
28 
29  typedef queue_item_time::item_type item_type;
30 
31  composite_subscription lifetime;
32  worker controller;
33 
34  public:
35  virtual ~loop_worker()
36  {
37  }
38  loop_worker(composite_subscription cs, worker w)
39  : lifetime(cs)
40  , controller(w)
41  {
42  }
43 
44  virtual clock_type::time_point now() const {
45  return clock_type::now();
46  }
47 
48  virtual void schedule(const schedulable& scbl) const {
49  controller.schedule(lifetime, scbl.get_action());
50  }
51 
52  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
53  controller.schedule(when, lifetime, scbl.get_action());
54  }
55  };
56 
57  mutable thread_factory factory;
58  scheduler newthread;
59  mutable std::atomic<std::size_t> count;
60  std::vector<worker> loops;
61 
62 public:
64  : factory([](std::function<void()> start){
65  return std::thread(std::move(start));
66  })
67  , newthread(make_new_thread())
68  , count(0)
69  {
70  auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
71  while (remaining--) {
72  loops.push_back(newthread.create_worker());
73  }
74  }
76  : factory(tf)
77  , newthread(make_new_thread(tf))
78  , count(0)
79  {
80  auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
81  while (remaining--) {
82  loops.push_back(newthread.create_worker());
83  }
84  }
85  virtual ~event_loop()
86  {
87  }
88 
89  virtual clock_type::time_point now() const {
90  return clock_type::now();
91  }
92 
94  return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()]));
95  }
96 };
97 
99  static scheduler instance = make_scheduler<event_loop>();
100  return instance;
101 }
103  return make_scheduler<event_loop>(tf);
104 }
105 
106 }
107 
108 }
109 
110 #endif
Definition: rx-scheduler.hpp:163
virtual worker create_worker(composite_subscription cs) const
Definition: rx-eventloop.hpp:93
Definition: rx-all.hpp:26
virtual ~event_loop()
Definition: rx-eventloop.hpp:85
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
Definition: rx-eventloop.hpp:14
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
scheduler make_event_loop()
Definition: rx-eventloop.hpp:98
scheduler make_new_thread()
Definition: rx-newthread.hpp:170
Definition: rx-scheduler.hpp:353
event_loop()
Definition: rx-eventloop.hpp:63
event_loop(thread_factory tf)
Definition: rx-eventloop.hpp:75
Definition: rx-scheduler.hpp:426
const action & get_action() const
Definition: rx-scheduler.hpp:550
virtual clock_type::time_point now() const
Definition: rx-eventloop.hpp:89
Definition: rx-scheduler.hpp:200