RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-interval.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_INTERVAL_HPP)
6 #define RXCPP_SOURCES_RX_INTERVAL_HPP
7 
8 #include "../rx-includes.hpp"
9 
38 namespace rxcpp {
39 
40 namespace sources {
41 
42 namespace detail {
43 
44 template<class Coordination>
45 struct interval : public source_base<long>
46 {
47  typedef interval<Coordination> this_type;
48 
49  typedef rxu::decay_t<Coordination> coordination_type;
50  typedef typename coordination_type::coordinator_type coordinator_type;
51 
52  struct interval_initial_type
53  {
54  interval_initial_type(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn)
55  : initial(i)
56  , period(p)
57  , coordination(std::move(cn))
58  {
59  }
60  rxsc::scheduler::clock_type::time_point initial;
61  rxsc::scheduler::clock_type::duration period;
62  coordination_type coordination;
63  };
64  interval_initial_type initial;
65 
66  interval(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn)
67  : initial(i, p, std::move(cn))
68  {
69  }
70  template<class Subscriber>
71  void on_subscribe(Subscriber o) const {
72  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
73 
74  // creates a worker whose lifetime is the same as this subscription
75  auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
76 
77  auto controller = coordinator.get_worker();
78 
79  auto counter = std::make_shared<long>(0);
80 
81  auto producer = [o, counter](const rxsc::schedulable&) {
82  // send next value
83  o.on_next(++(*counter));
84  };
85 
86  auto selectedProducer = on_exception(
87  [&](){return coordinator.act(producer);},
88  o);
89  if (selectedProducer.empty()) {
90  return;
91  }
92 
93  controller.schedule_periodically(initial.initial, initial.period, selectedProducer.get());
94  }
95 };
96 
97 template<class Duration, class Coordination>
98 struct defer_interval : public defer_observable<
99  rxu::all_true<
100  std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value,
101  is_coordination<Coordination>::value>,
102  void,
103  interval, Coordination>
104 {
105 };
106 
107 }
108 
109 
112 template<class Duration>
113 auto interval(Duration period)
114  -> typename std::enable_if<
115  detail::defer_interval<Duration, identity_one_worker>::value,
116  typename detail::defer_interval<Duration, identity_one_worker>::observable_type>::type {
117  return detail::defer_interval<Duration, identity_one_worker>::make(identity_current_thread().now(), period, identity_current_thread());
118 }
119 
122 template<class Coordination>
123 auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
124  -> typename std::enable_if<
125  detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value,
126  typename detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type {
127  return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(cn.now(), period, std::move(cn));
128 }
129 
132 template<class Duration>
133 auto interval(rxsc::scheduler::clock_type::time_point when, Duration period)
134  -> typename std::enable_if<
135  detail::defer_interval<Duration, identity_one_worker>::value,
136  typename detail::defer_interval<Duration, identity_one_worker>::observable_type>::type {
137  return detail::defer_interval<Duration, identity_one_worker>::make(when, period, identity_current_thread());
138 }
139 
142 template<class Coordination>
143 auto interval(rxsc::scheduler::clock_type::time_point when, rxsc::scheduler::clock_type::duration period, Coordination cn)
144  -> typename std::enable_if<
145  detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value,
146  typename detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type {
147  return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(when, period, std::move(cn));
148 }
149 
150 }
151 
152 }
153 
154 #endif
Definition: rx-all.hpp:26
static const bool value
Definition: rx-predef.hpp:123
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175