RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-timer.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_TIMER_HPP)
6 #define RXCPP_SOURCES_RX_TIMER_HPP
7 
8 #include "../rx-includes.hpp"
9 
38 namespace rxcpp {
39 
40 namespace sources {
41 
42 namespace detail {
43 
44 template<class Coordination>
45 struct timer : public source_base<long>
46 {
47  typedef timer<Coordination> this_type;
48 
49  typedef rxu::decay_t<Coordination> coordination_type;
50  typedef typename coordination_type::coordinator_type coordinator_type;
51 
52  struct timer_initial_type
53  {
54  timer_initial_type(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
55  : when(t)
56  , coordination(std::move(cn))
57  {
58  }
59  rxsc::scheduler::clock_type::time_point when;
60  coordination_type coordination;
61  };
62  timer_initial_type initial;
63 
64  timer(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
65  : initial(t, std::move(cn))
66  {
67  }
68  timer(rxsc::scheduler::clock_type::duration p, coordination_type cn)
69  : initial(rxsc::scheduler::clock_type::time_point(), std::move(cn))
70  {
71  initial.when = initial.coordination.now() + p;
72  }
73  template<class Subscriber>
74  void on_subscribe(Subscriber o) const {
75  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
76 
77  // creates a worker whose lifetime is the same as this subscription
78  auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
79  auto controller = coordinator.get_worker();
80 
81  auto producer = [o](const rxsc::schedulable&) {
82  // send the value and complete
83  o.on_next(1L);
84  o.on_completed();
85  };
86 
87  auto selectedProducer = on_exception(
88  [&](){return coordinator.act(producer);},
89  o);
90  if (selectedProducer.empty()) {
91  return;
92  }
93 
94  controller.schedule(initial.when, selectedProducer.get());
95  }
96 };
97 
98 template<class TimePointOrDuration, class Coordination>
99 struct defer_timer : public defer_observable<
100  rxu::all_true<
101  std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value ||
102  std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::duration>::value,
103  is_coordination<Coordination>::value>,
104  void,
105  timer, Coordination>
106 {
107 };
108 
109 }
110 
113 template<class TimePointOrDuration>
114 auto timer(TimePointOrDuration when)
115  -> typename std::enable_if<
116  detail::defer_timer<TimePointOrDuration, identity_one_worker>::value,
117  typename detail::defer_timer<TimePointOrDuration, identity_one_worker>::observable_type>::type {
118  return detail::defer_timer<TimePointOrDuration, identity_one_worker>::make(when, identity_current_thread());
119 }
120 
123 template<class TimePointOrDuration, class Coordination>
124 auto timer(TimePointOrDuration when, Coordination cn)
125  -> typename std::enable_if<
126  detail::defer_timer<TimePointOrDuration, Coordination>::value,
127  typename detail::defer_timer<TimePointOrDuration, Coordination>::observable_type>::type {
128  return detail::defer_timer<TimePointOrDuration, Coordination>::make(when, std::move(cn));
129 }
130 
131 }
132 
133 }
134 
135 #endif
Definition: rx-all.hpp:26
static const bool value
Definition: rx-predef.hpp:123
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175