RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-scheduler.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
6 #define RXCPP_RX_SCHEDULER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 class worker_interface;
16 
17 namespace detail {
18 
19 class action_type;
20 typedef std::shared_ptr<action_type> action_ptr;
21 
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
24 
25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
27 
28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
30 
31 inline action_ptr shared_empty() {
32  static action_ptr shared_empty = std::make_shared<detail::action_type>();
33  return shared_empty;
34 }
35 
36 }
37 
38 // It is essential to keep virtual function calls out of an inner loop.
39 // To make tail-recursion work efficiently the recursion objects create
40 // a space on the stack inside the virtual function call in the actor that
41 // allows the callback and the scheduler to share stack space that records
42 // the request and the allowance without any virtual calls in the loop.
43 
46 class recursed
47 {
48  bool& isrequested;
49  recursed operator=(const recursed&);
50 public:
51  explicit recursed(bool& r)
52  : isrequested(r)
53  {
54  }
56  inline void operator()() const {
57  isrequested = true;
58  }
59 };
60 
63 class recurse
64 {
65  bool& isallowed;
66  mutable bool isrequested;
67  recursed requestor;
68  recurse operator=(const recurse&);
69 public:
70  explicit recurse(bool& a)
71  : isallowed(a)
72  , isrequested(true)
73  , requestor(isrequested)
74  {
75  }
77  inline bool is_allowed() const {
78  return isallowed;
79  }
81  inline bool is_requested() const {
82  return isrequested;
83  }
85  inline void reset() const {
86  isrequested = false;
87  }
89  inline const recursed& get_recursed() const {
90  return requestor;
91  }
92 };
93 
95 class recursion
96 {
97  mutable bool isallowed;
98  recurse recursor;
99  recursion operator=(const recursion&);
100 public:
102  : isallowed(true)
103  , recursor(isallowed)
104  {
105  }
106  explicit recursion(bool b)
107  : isallowed(b)
108  , recursor(isallowed)
109  {
110  }
112  inline void reset(bool b = true) const {
113  isallowed = b;
114  }
116  inline const recurse& get_recurse() const {
117  return recursor;
118  }
119 };
120 
121 
123 {
125 };
126 
127 class schedulable;
128 
130 class action : public action_base
131 {
132  typedef action this_type;
133  detail::action_ptr inner;
134 public:
136  {
137  }
138  explicit action(detail::action_ptr i)
139  : inner(std::move(i))
140  {
141  }
142 
144  inline static action empty() {
145  return action(detail::shared_empty());
146  }
147 
149  inline void operator()(const schedulable& s, const recurse& r) const;
150 };
151 
153 {
154  typedef std::chrono::steady_clock clock_type;
156 };
157 
159 {
161 };
162 
164  : public std::enable_shared_from_this<worker_interface>
165 {
166  typedef worker_interface this_type;
167 
168 public:
170 
171  virtual ~worker_interface() {}
172 
173  virtual clock_type::time_point now() const = 0;
174 
175  virtual void schedule(const schedulable& scbl) const = 0;
176  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
177 };
178 
179 namespace detail {
180 
181 template<class F>
182 struct is_action_function
183 {
184  struct not_void {};
185  template<class CF>
186  static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
187  template<class CF>
188  static not_void check(...);
189 
190  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
191 };
192 
193 }
194 
195 class weak_worker;
196 
200 class worker : public worker_base
201 {
202  typedef worker this_type;
203  detail::worker_interface_ptr inner;
204  composite_subscription lifetime;
205  friend bool operator==(const worker&, const worker&);
206  friend class weak_worker;
207 public:
210 
212  {
213  }
214  worker(composite_subscription cs, detail::const_worker_interface_ptr i)
215  : inner(std::const_pointer_cast<worker_interface>(i))
216  , lifetime(std::move(cs))
217  {
218  }
220  : inner(o.inner)
221  , lifetime(std::move(cs))
222  {
223  }
224 
225  inline const composite_subscription& get_subscription() const {
226  return lifetime;
227  }
229  return lifetime;
230  }
231 
232  // composite_subscription
233  //
234  inline bool is_subscribed() const {
235  return lifetime.is_subscribed();
236  }
237  inline weak_subscription add(subscription s) const {
238  return lifetime.add(std::move(s));
239  }
240  inline void remove(weak_subscription w) const {
241  return lifetime.remove(std::move(w));
242  }
243  inline void clear() const {
244  return lifetime.clear();
245  }
246  inline void unsubscribe() const {
247  return lifetime.unsubscribe();
248  }
249 
250  // worker_interface
251  //
253  inline clock_type::time_point now() const {
254  return inner->now();
255  }
256 
258  inline void schedule(const schedulable& scbl) const {
259  // force rebinding scbl to this worker
260  schedule_rebind(scbl);
261  }
262 
264  inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
265  // force rebinding scbl to this worker
266  schedule_rebind(when, scbl);
267  }
268 
269  // helpers
270  //
271 
273  inline void schedule(clock_type::duration when, const schedulable& scbl) const {
274  // force rebinding scbl to this worker
275  schedule_rebind(now() + when, scbl);
276  }
277 
280  inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
281  // force rebinding scbl to this worker
282  schedule_periodically_rebind(initial, period, scbl);
283  }
284 
287  inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
288  // force rebinding scbl to this worker
289  schedule_periodically_rebind(now() + initial, period, scbl);
290  }
291 
293  template<class Arg0, class... ArgN>
294  auto schedule(Arg0&& a0, ArgN&&... an) const
295  -> typename std::enable_if<
296  (detail::is_action_function<Arg0>::value ||
299  template<class... ArgN>
301  void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
302 
304  template<class Arg0, class... ArgN>
305  auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
306  -> typename std::enable_if<
307  (detail::is_action_function<Arg0>::value ||
311  template<class... ArgN>
312  void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
313 
315  template<class Arg0, class... ArgN>
316  auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
317  -> typename std::enable_if<
318  (detail::is_action_function<Arg0>::value ||
322  template<class... ArgN>
323  void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
324 };
325 
326 inline bool operator==(const worker& lhs, const worker& rhs) {
327  return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
328 }
329 inline bool operator!=(const worker& lhs, const worker& rhs) {
330  return !(lhs == rhs);
331 }
332 
334 {
335  detail::worker_interface_weak_ptr inner;
336  composite_subscription lifetime;
337 
338 public:
340  {
341  }
342  explicit weak_worker(worker& owner)
343  : inner(owner.inner)
344  , lifetime(owner.lifetime)
345  {
346  }
347 
348  worker lock() const {
349  return worker(lifetime, inner.lock());
350  }
351 };
352 
354  : public std::enable_shared_from_this<scheduler_interface>
355 {
357 
358 public:
360 
361  virtual ~scheduler_interface() {}
362 
363  virtual clock_type::time_point now() const = 0;
364 
365  virtual worker create_worker(composite_subscription cs) const = 0;
366 };
367 
368 
370  // public subscription_base, <- already in worker base
371  public worker_base,
372  public action_base
373 {
375 };
376 
383 class scheduler : public scheduler_base
384 {
385  typedef scheduler this_type;
386  detail::scheduler_interface_ptr inner;
387  friend bool operator==(const scheduler&, const scheduler&);
388 public:
390 
392  {
393  }
394  explicit scheduler(detail::scheduler_interface_ptr i)
395  : inner(std::move(i))
396  {
397  }
398  explicit scheduler(detail::const_scheduler_interface_ptr i)
399  : inner(std::const_pointer_cast<scheduler_interface>(i))
400  {
401  }
402 
404  inline clock_type::time_point now() const {
405  return inner->now();
406  }
413  return inner->create_worker(cs);
414  }
415 };
416 
417 template<class Scheduler, class... ArgN>
418 inline scheduler make_scheduler(ArgN&&... an) {
419  return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
420 }
421 
422 inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
423  return scheduler(si);
424 }
425 
427 {
428  typedef schedulable this_type;
429 
430  composite_subscription lifetime;
431  weak_worker controller;
432  action activity;
433  bool scoped;
435 
436  struct detacher
437  {
438  ~detacher()
439  {
440  if (that) {
441  that->unsubscribe();
442  }
443  }
444  detacher(const this_type* that)
445  : that(that)
446  {
447  }
448  const this_type* that;
449  };
450 
451  class recursed_scope_type
452  {
453  mutable const recursed* requestor;
454 
455  class exit_recursed_scope_type
456  {
457  const recursed_scope_type* that;
458  public:
459  ~exit_recursed_scope_type()
460  {
461  that->requestor = nullptr;
462  }
463  exit_recursed_scope_type(const recursed_scope_type* that)
464  : that(that)
465  {
466  }
467  };
468  public:
469  recursed_scope_type()
470  : requestor(nullptr)
471  {
472  }
473  recursed_scope_type(const recursed_scope_type&)
474  : requestor(nullptr)
475  {
476  // does not aquire recursion scope
477  }
478  recursed_scope_type& operator=(const recursed_scope_type& )
479  {
480  // no change in recursion scope
481  return *this;
482  }
483  exit_recursed_scope_type reset(const recurse& r) const {
484  requestor = std::addressof(r.get_recursed());
485  return exit_recursed_scope_type(this);
486  }
487  bool is_recursed() const {
488  return !!requestor;
489  }
490  void operator()() const {
491  (*requestor)();
492  }
493  };
494  recursed_scope_type recursed_scope;
495 
496 public:
499 
501  {
502  if (scoped) {
503  controller.lock().remove(action_scope);
504  }
505  }
507  : scoped(false)
508  {
509  }
510 
513  : lifetime(q.get_subscription())
514  , controller(q)
515  , activity(std::move(a))
516  , scoped(false)
517  {
518  }
521  : lifetime(std::move(cs))
522  , controller(q)
523  , activity(std::move(a))
524  , scoped(true)
525  , action_scope(controller.lock().add(lifetime))
526  {
527  }
530  : lifetime(scbl.get_subscription())
531  , controller(q)
532  , activity(std::move(a))
533  , scoped(scbl.scoped)
534  , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
535  {
536  }
537 
538  inline const composite_subscription& get_subscription() const {
539  return lifetime;
540  }
542  return lifetime;
543  }
544  inline const worker get_worker() const {
545  return controller.lock();
546  }
547  inline worker get_worker() {
548  return controller.lock();
549  }
550  inline const action& get_action() const {
551  return activity;
552  }
553  inline action& get_action() {
554  return activity;
555  }
556 
557  inline static schedulable empty(worker sc) {
559  }
560 
561  inline auto set_recursed(const recurse& r) const
562  -> decltype(recursed_scope.reset(r)) {
563  return recursed_scope.reset(r);
564  }
565 
566  // recursed
567  //
568  bool is_recursed() const {
569  return recursed_scope.is_recursed();
570  }
579  inline void operator()() const {
580  recursed_scope();
581  }
582 
583  // composite_subscription
584  //
585  inline bool is_subscribed() const {
586  return lifetime.is_subscribed();
587  }
588  inline weak_subscription add(subscription s) const {
589  return lifetime.add(std::move(s));
590  }
591  template<class F>
592  auto add(F f) const
593  -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
594  return lifetime.add(make_subscription(std::move(f)));
595  }
596  inline void remove(weak_subscription w) const {
597  return lifetime.remove(std::move(w));
598  }
599  inline void clear() const {
600  return lifetime.clear();
601  }
602  inline void unsubscribe() const {
603  return lifetime.unsubscribe();
604  }
605 
606  // scheduler
607  //
608  inline clock_type::time_point now() const {
609  return controller.lock().now();
610  }
612  inline void schedule() const {
613  if (is_subscribed()) {
614  get_worker().schedule(*this);
615  }
616  }
618  inline void schedule(clock_type::time_point when) const {
619  if (is_subscribed()) {
620  get_worker().schedule(when, *this);
621  }
622  }
624  inline void schedule(clock_type::duration when) const {
625  if (is_subscribed()) {
626  get_worker().schedule(when, *this);
627  }
628  }
629 
630  // action
631  //
633  inline void operator()(const recurse& r) const {
634  if (!is_subscribed()) {
635  return;
636  }
637  detacher protect(this);
638  activity(*this, r);
639  protect.that = nullptr;
640  }
641 };
642 
643 struct current_thread;
644 
645 namespace detail {
646 
647 class action_type
648  : public std::enable_shared_from_this<action_type>
649 {
650  typedef action_type this_type;
651 
652 public:
653  typedef std::function<void(const schedulable&, const recurse&)> function_type;
654 
655 private:
656  function_type f;
657 
658 public:
659  action_type()
660  {
661  }
662 
663  action_type(function_type f)
664  : f(std::move(f))
665  {
666  }
667 
668  inline void operator()(const schedulable& s, const recurse& r) {
669  if (!f) {
670  std::terminate();
671  }
672  f(s, r);
673  }
674 };
675 
676 class action_tailrecurser
677  : public std::enable_shared_from_this<action_type>
678 {
679  typedef action_type this_type;
680 
681 public:
682  typedef std::function<void(const schedulable&)> function_type;
683 
684 private:
685  function_type f;
686 
687 public:
688  action_tailrecurser()
689  {
690  }
691 
692  action_tailrecurser(function_type f)
693  : f(std::move(f))
694  {
695  }
696 
697  inline void operator()(const schedulable& s, const recurse& r) {
698  if (!f) {
699  std::terminate();
700  }
701  trace_activity().action_enter(s);
702  auto scope = s.set_recursed(r);
703  while (s.is_subscribed()) {
704  r.reset();
705  f(s);
706  if (!r.is_allowed() || !r.is_requested()) {
707  if (r.is_requested()) {
708  s.schedule();
709  }
710  break;
711  }
712  trace_activity().action_recurse(s);
713  }
714  trace_activity().action_return(s);
715  }
716 };
717 }
718 
719 inline void action::operator()(const schedulable& s, const recurse& r) const {
720  (*inner)(s, r);
721 }
722 
724  return action::empty();
725 }
726 
727 template<class F>
728 inline action make_action(F&& f) {
729  static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
730  auto fn = std::forward<F>(f);
731  return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
732 }
733 
734 // copy
735 inline auto make_schedulable(
736  const schedulable& scbl)
737  -> schedulable {
738  return schedulable(scbl);
739 }
740 // move
741 inline auto make_schedulable(
742  schedulable&& scbl)
743  -> schedulable {
744  return schedulable(std::move(scbl));
745 }
746 
748  return schedulable(sc, a);
749 }
751  return schedulable(cs, sc, a);
752 }
753 
754 template<class F>
755 auto make_schedulable(worker sc, F&& f)
756  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
757  return schedulable(sc, make_action(std::forward<F>(f)));
758 }
759 template<class F>
761  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
762  return schedulable(cs, sc, make_action(std::forward<F>(f)));
763 }
764 template<class F>
766  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
767  return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
768 }
769 template<class F>
770 auto make_schedulable(schedulable scbl, worker sc, F&& f)
771  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
772  return schedulable(scbl, sc, make_action(std::forward<F>(f)));
773 }
774 template<class F>
775 auto make_schedulable(schedulable scbl, F&& f)
776  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
777  return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
778 }
779 
781  -> schedulable {
782  return schedulable(cs, scbl.get_worker(), scbl.get_action());
783 }
785  -> schedulable {
786  return schedulable(cs, sc, scbl.get_action());
787 }
788 inline auto make_schedulable(schedulable scbl, worker sc)
789  -> schedulable {
790  return schedulable(scbl, sc, scbl.get_action());
791 }
792 
793 template<class Arg0, class... ArgN>
794 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
795  -> typename std::enable_if<
796  (detail::is_action_function<Arg0>::value ||
797  is_subscription<Arg0>::value) &&
798  !is_schedulable<Arg0>::value>::type {
799  auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
800  trace_activity().schedule_enter(*inner.get(), scbl);
801  inner->schedule(std::move(scbl));
802  trace_activity().schedule_return(*inner.get());
803 }
804 template<class... ArgN>
805 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
806  auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
807  trace_activity().schedule_enter(*inner.get(), rescbl);
808  inner->schedule(std::move(rescbl));
809  trace_activity().schedule_return(*inner.get());
810 }
811 
812 template<class Arg0, class... ArgN>
813 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
814  -> typename std::enable_if<
815  (detail::is_action_function<Arg0>::value ||
816  is_subscription<Arg0>::value) &&
817  !is_schedulable<Arg0>::value>::type {
818  auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
819  trace_activity().schedule_when_enter(*inner.get(), when, scbl);
820  inner->schedule(when, std::move(scbl));
821  trace_activity().schedule_when_return(*inner.get());
822 }
823 template<class... ArgN>
824 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
825  auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
826  trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
827  inner->schedule(when, std::move(rescbl));
828  trace_activity().schedule_when_return(*inner.get());
829 }
830 
831 template<class Arg0, class... ArgN>
832 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
833  -> typename std::enable_if<
834  (detail::is_action_function<Arg0>::value ||
835  is_subscription<Arg0>::value) &&
836  !is_schedulable<Arg0>::value>::type {
837  schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
838 }
839 template<class... ArgN>
840 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
841  auto keepAlive = *this;
842  auto target = std::make_shared<clock_type::time_point>(initial);
843  auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
844  auto periodic = make_schedulable(
845  activity,
846  [keepAlive, target, period, activity](schedulable self) {
847  // any recursion requests will be pushed to the scheduler queue
848  recursion r(false);
849  // call action
850  activity(r.get_recurse());
851 
852  // schedule next occurance (if the action took longer than 'period' target will be in the past)
853  *target += period;
854  self.schedule(*target);
855  });
856  trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
857  inner->schedule(*target, periodic);
858  trace_activity().schedule_when_return(*inner.get());
859 }
860 
861 namespace detail {
862 
863 template<class TimePoint>
864 struct time_schedulable
865 {
866  typedef TimePoint time_point_type;
867 
868  time_schedulable(TimePoint when, schedulable a)
869  : when(when)
870  , what(std::move(a))
871  {
872  }
873  TimePoint when;
875 };
876 
877 
878 // Sorts time_schedulable items in priority order sorted
879 // on value of time_schedulable.when. Items with equal
880 // values for when are sorted in fifo order.
881 template<class TimePoint>
882 class schedulable_queue {
883 public:
884  typedef time_schedulable<TimePoint> item_type;
885  typedef std::pair<item_type, int64_t> elem_type;
886  typedef std::vector<elem_type> container_type;
887  typedef const item_type& const_reference;
888 
889 private:
890  struct compare_elem
891  {
892  bool operator()(const elem_type& lhs, const elem_type& rhs) const {
893  if (lhs.first.when == rhs.first.when) {
894  return lhs.second > rhs.second;
895  }
896  else {
897  return lhs.first.when > rhs.first.when;
898  }
899  }
900  };
901 
902  typedef std::priority_queue<
903  elem_type,
904  container_type,
905  compare_elem
906  > queue_type;
907 
908  queue_type q;
909 
910  int64_t ordinal;
911 public:
912 
913  schedulable_queue()
914  : ordinal(0)
915  {
916  }
917 
918  const_reference top() const {
919  return q.top().first;
920  }
921 
922  void pop() {
923  q.pop();
924  }
925 
926  bool empty() const {
927  return q.empty();
928  }
929 
930  void push(const item_type& value) {
931  q.push(elem_type(value, ordinal++));
932  }
933 
934  void push(item_type&& value) {
935  q.push(elem_type(std::move(value), ordinal++));
936  }
937 };
938 
939 }
940 
941 }
942 namespace rxsc=schedulers;
943 
944 }
945 
947 #include "schedulers/rx-runloop.hpp"
953 
954 #endif
tag_action action_tag
Definition: rx-scheduler.hpp:124
void schedule(clock_type::time_point when, const schedulable &scbl) const
insert the supplied schedulable to be run at the time specified
Definition: rx-scheduler.hpp:264
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:209
Definition: rx-scheduler.hpp:163
void unsubscribe() const
Definition: rx-scheduler.hpp:246
const recursed & get_recursed() const
get the recursed to set into the schedulable for the function to use to request recursion ...
Definition: rx-scheduler.hpp:89
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:181
void reset(bool b=true) const
set whether tail-recursion is allowed
Definition: rx-scheduler.hpp:112
void schedule() const
put this on the queue of the stored scheduler to run asap
Definition: rx-scheduler.hpp:612
void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:287
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:538
virtual ~scheduler_interface()
Definition: rx-scheduler.hpp:361
Definition: rx-all.hpp:26
bool is_allowed() const
does the scheduler allow tail-recursion now?
Definition: rx-scheduler.hpp:77
bool is_subscribed() const
Definition: rx-scheduler.hpp:585
worker(composite_subscription cs, detail::const_worker_interface_ptr i)
Definition: rx-scheduler.hpp:214
schedulable(composite_subscription cs, worker q, action a)
action and worker have independent lifetimes
Definition: rx-scheduler.hpp:520
tag_scheduler scheduler_tag
Definition: rx-scheduler.hpp:155
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-scheduler.hpp:152
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
action(detail::action_ptr i)
Definition: rx-scheduler.hpp:138
bool is_recursed() const
Definition: rx-scheduler.hpp:568
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:432
Definition: rx-scheduler.hpp:158
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:498
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
action make_action(F &&f)
Definition: rx-scheduler.hpp:728
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
void operator()(const recurse &r) const
invokes the action
Definition: rx-scheduler.hpp:633
bool operator!=(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:329
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:228
Definition: rx-scheduler.hpp:46
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:237
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:497
Definition: rx-currentthread.hpp:131
virtual ~worker_interface()
Definition: rx-scheduler.hpp:171
worker(composite_subscription cs, worker o)
Definition: rx-scheduler.hpp:219
void schedule(clock_type::time_point when) const
put this on the queue of the stored scheduler to run at the specified time
Definition: rx-scheduler.hpp:618
void operator()() const
Definition: rx-scheduler.hpp:579
clock_type::time_point now() const
Definition: rx-scheduler.hpp:608
action & get_action()
Definition: rx-scheduler.hpp:553
weak_worker()
Definition: rx-scheduler.hpp:339
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:225
Definition: rx-subscription.hpp:31
void operator()(const schedulable &s, const recurse &r) const
call the function
Definition: rx-scheduler.hpp:719
action provides type-forgetting for a potentially recursive set of calls to a function that takes a s...
Definition: rx-scheduler.hpp:130
Definition: rx-predef.hpp:21
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
void unsubscribe() const
Definition: rx-scheduler.hpp:602
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
Definition: rx-predef.hpp:30
Definition: rx-scheduler.hpp:122
void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:280
static action empty()
return the empty action
Definition: rx-scheduler.hpp:144
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:240
void clear() const
Definition: rx-scheduler.hpp:599
Definition: rx-predef.hpp:58
static composite_subscription empty()
Definition: rx-subscription.hpp:404
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:208
scheduler(detail::scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:394
scheduler()
Definition: rx-scheduler.hpp:391
weak_worker(worker &owner)
Definition: rx-scheduler.hpp:342
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
bool is_requested() const
did the function request to be recursed?
Definition: rx-scheduler.hpp:81
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
clock_type::time_point now() const
return the current time for this worker
Definition: rx-scheduler.hpp:253
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
std::chrono::steady_clock clock_type
Definition: rx-scheduler.hpp:154
void clear() const
Definition: rx-scheduler.hpp:243
action make_action_empty()
Definition: rx-scheduler.hpp:723
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
scheduler make_scheduler(ArgN &&...an)
Definition: rx-scheduler.hpp:418
recurse(bool &a)
Definition: rx-scheduler.hpp:70
void unsubscribe() const
Definition: rx-subscription.hpp:170
schedulable()
Definition: rx-scheduler.hpp:506
recursion()
Definition: rx-scheduler.hpp:101
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
tag_worker worker_tag
Definition: rx-scheduler.hpp:160
const recurse & get_recurse() const
get the recurse to pass into each action being called
Definition: rx-scheduler.hpp:116
Definition: rx-scheduler.hpp:369
void operator()() const
request to be rescheduled
Definition: rx-scheduler.hpp:56
Definition: rx-scheduler.hpp:63
auto add(F f) const -> typename std::enable_if< rxcpp::detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-scheduler.hpp:592
bool operator==(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:326
Definition: rx-scheduler.hpp:353
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:588
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:389
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:541
Definition: rx-predef.hpp:56
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
Definition: rx-scheduler.hpp:333
Definition: rx-subscription.hpp:29
void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl, ArgN &&...an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:840
const worker get_worker() const
Definition: rx-scheduler.hpp:544
scheduler(detail::const_scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:398
recursed(bool &r)
Definition: rx-scheduler.hpp:51
worker get_worker()
Definition: rx-scheduler.hpp:547
bool is_subscribed() const
Definition: rx-scheduler.hpp:234
Definition: rx-subscription.hpp:67
void reset() const
reset the function request. call before each call to the function.
Definition: rx-scheduler.hpp:85
static schedulable empty(worker sc)
Definition: rx-scheduler.hpp:557
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
schedulable(worker q, action a)
action and worker share lifetime
Definition: rx-scheduler.hpp:512
void schedule(clock_type::duration when, const schedulable &scbl) const
insert the supplied schedulable to be run at now() + the delay specified
Definition: rx-scheduler.hpp:273
worker()
Definition: rx-scheduler.hpp:211
Definition: rx-scheduler.hpp:426
bool is_subscribed() const
Definition: rx-subscription.hpp:164
recursion(bool b)
Definition: rx-scheduler.hpp:106
const action & get_action() const
Definition: rx-scheduler.hpp:550
action()
Definition: rx-scheduler.hpp:135
Definition: rx-predef.hpp:43
auto set_recursed(const recurse &r) const -> decltype(recursed_scope.reset(r))
Definition: rx-scheduler.hpp:561
~schedulable()
Definition: rx-scheduler.hpp:500
void schedule(clock_type::duration when) const
put this on the queue of the stored scheduler to run after a delay from now
Definition: rx-scheduler.hpp:624
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:169
worker lock() const
Definition: rx-scheduler.hpp:348
void schedule_rebind(const schedulable &scbl, ArgN &&...an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:805
schedulable(schedulable scbl, worker q, action a)
inherit lifetimes
Definition: rx-scheduler.hpp:529
tag_schedulable schedulable_tag
Definition: rx-scheduler.hpp:374
Definition: rx-scheduler.hpp:200