5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP) 6 #define RXCPP_RX_SCHEDULER_TEST_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
16 class test_type :
public scheduler_interface
22 struct test_type_state :
public virtual_time<long, long>
24 typedef virtual_time<long, long> base;
27 using base::schedule_relative;
29 clock_type::time_point now()
const {
30 return to_time_point(clock_now);
33 virtual void schedule_absolute(
long when,
const schedulable& a)
const 35 if (when <= base::clock_now)
36 when = base::clock_now + 1;
41 virtual long add(
long absolute,
long relative)
const 43 return absolute + relative;
46 virtual clock_type::time_point to_time_point(
long absolute)
const 48 return clock_type::time_point(std::chrono::milliseconds(absolute));
51 virtual long to_relative(clock_type::duration d)
const 53 return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).
count());
58 mutable std::shared_ptr<test_type_state> state;
61 struct test_type_worker :
public worker_interface
63 mutable std::shared_ptr<test_type_state> state;
65 typedef test_type_state::absolute absolute;
66 typedef test_type_state::relative relative;
68 test_type_worker(std::shared_ptr<test_type_state> st)
69 : state(std::move(st))
73 virtual clock_type::time_point now()
const {
77 virtual void schedule(
const schedulable& scbl)
const {
78 state->schedule_absolute(state->clock(), scbl);
81 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
82 state->schedule_relative(state->to_relative(when - now()), scbl);
85 void schedule_absolute(absolute when,
const schedulable& scbl)
const {
86 state->schedule_absolute(when, scbl);
89 void schedule_relative(relative when,
const schedulable& scbl)
const {
90 state->schedule_relative(when, scbl);
93 bool is_enabled()
const {
return state->is_enabled();}
94 absolute clock()
const {
return state->clock();}
106 void advance_to(absolute time)
const 108 state->advance_to(time);
111 void advance_by(relative time)
const 113 state->advance_by(time);
116 void sleep(relative time)
const 127 : state(std::make_shared<test_type_state>())
131 virtual clock_type::time_point now()
const {
135 virtual worker create_worker(composite_subscription cs)
const {
136 return worker(cs, std::make_shared<test_type_worker>(state));
139 bool is_enabled()
const {
return state->is_enabled();}
141 return state->clock();
144 clock_type::time_point to_time_point(
long absolute)
const {
145 return state->to_time_point(absolute);
148 std::shared_ptr<test_type_worker> create_test_type_worker_interface()
const {
149 return std::make_shared<test_type_worker>(state);
153 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const;
156 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const;
161 :
public rxt::detail::test_subject_base<T>
163 typedef typename rxn::notification<T> notification_type;
164 typedef rxn::recorded<typename notification_type::type> recorded_type;
167 explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
172 std::shared_ptr<test_type::test_type_state> sc;
173 std::vector<recorded_type> m;
175 virtual void on_subscribe(subscriber<T>)
const {
178 virtual std::vector<rxn::subscription> subscriptions()
const {
182 virtual std::vector<recorded_type> messages()
const {
190 typedef typename rxn::notification<T> notification_type;
191 typedef rxn::recorded<typename notification_type::type> recorded_type;
193 auto ts = std::make_shared<mock_observer<T>>(state);
195 return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
200 recorded_type(ts->sc->clock(), notification_type::on_next(value)));
203 [ts](std::exception_ptr e)
206 recorded_type(ts->sc->clock(), notification_type::on_error(e)));
212 recorded_type(ts->sc->clock(), notification_type::on_completed()));
217 class cold_observable
218 :
public rxt::detail::test_subject_base<T>
220 typedef cold_observable<T> this_type;
221 std::shared_ptr<test_type::test_type_state> sc;
222 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223 mutable std::vector<recorded_type> mv;
224 mutable std::vector<rxn::subscription> sv;
225 mutable worker controller;
229 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
236 template<
class Iterator>
237 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
244 virtual void on_subscribe(subscriber<T> o)
const {
245 sv.push_back(rxn::subscription(sc->clock()));
246 auto index = sv.size() - 1;
248 for (
auto& message : mv) {
249 auto n = message.value();
252 [n, o](
const schedulable&) {
253 if (o.is_subscribed()) {
259 auto sharedThis = std::static_pointer_cast<
const this_type>(this->shared_from_this());
260 o.add([sharedThis, index]() {
261 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
265 virtual std::vector<rxn::subscription> subscriptions()
const {
269 virtual std::vector<recorded_type> messages()
const {
275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const 277 auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278 return rxt::testable_observable<T>(co);
283 :
public rxt::detail::test_subject_base<T>
285 typedef hot_observable<T> this_type;
286 std::shared_ptr<test_type::test_type_state> sc;
287 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288 typedef subscriber<T> observer_type;
289 mutable std::vector<recorded_type> mv;
290 mutable std::vector<rxn::subscription> sv;
291 mutable std::list<observer_type> observers;
292 mutable worker controller;
296 hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
301 for (
auto& message : mv) {
302 auto n = message.value();
305 [
this, n](
const schedulable&) {
306 auto local = this->observers;
307 for (
auto& o : local) {
308 if (o.is_subscribed()) {
316 virtual ~hot_observable() {}
318 virtual void on_subscribe(observer_type o)
const {
319 auto olocation = observers.insert(observers.end(), o);
321 sv.push_back(rxn::subscription(sc->clock()));
322 auto index = sv.size() - 1;
324 auto sharedThis = std::static_pointer_cast<
const this_type>(this->shared_from_this());
325 o.add([sharedThis, index, olocation]() {
326 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327 sharedThis->observers.erase(olocation);
331 virtual std::vector<rxn::subscription> subscriptions()
const {
335 virtual std::vector<recorded_type> messages()
const {
341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const 343 auto worker = create_worker(composite_subscription());
344 auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages));
345 return rxt::testable_observable<T>(shared);
349 struct is_create_source_function
353 static auto check(
int) -> decltype((*(CF*)
nullptr)());
355 static not_void check(...);
357 static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
364 std::shared_ptr<detail::test_type> tester;
367 explicit test(std::shared_ptr<detail::test_type> t)
375 static const long created_time = 100;
376 static const long subscribed_time = 200;
377 static const long unsubscribed_time = 1000;
389 static recorded_type
next(
long ticks, U value) {
390 return recorded_type(ticks, notification_type::on_next(std::move(value)));
394 return recorded_type(ticks, notification_type::on_completed());
397 template<
typename Exception>
398 static recorded_type
error(
long ticks, Exception&& e) {
399 return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
409 std::shared_ptr<detail::test_type::test_type_worker> tester;
422 long clock()
const {
return tester->clock();}
425 tester->schedule_absolute(when, a);
429 tester->schedule_relative(when, a);
432 template<
class Arg0,
class... ArgN>
434 -> typename std::enable_if<
435 (detail::is_action_function<Arg0>::value ||
438 tester->schedule_absolute(when,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
441 template<
class Arg0,
class... ArgN>
443 -> typename std::enable_if<
444 (detail::is_action_function<Arg0>::value ||
447 tester->schedule_relative(when,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
452 tester->advance_to(time);
457 tester->advance_by(time);
465 template<
class T,
class F>
466 auto start(F createSource,
long created,
long subscribed,
long unsubscribed)
const 470 :
public std::enable_shared_from_this<state_type>
472 typedef decltype(createSource()) source_type;
474 std::unique_ptr<source_type> source;
483 auto state = std::make_shared<state_type>(this->make_subscriber<T>());
485 schedule_absolute(created, [createSource, state](
const schedulable&) {
486 state->source.reset(
new typename state_type::source_type(createSource()));
488 schedule_absolute(subscribed, [state](
const schedulable&) {
489 state->source->subscribe(state->o);
491 schedule_absolute(unsubscribed, [state](
const schedulable&) {
492 state->o.unsubscribe();
500 template<
class T,
class F>
501 auto start(F&& createSource,
long unsubscribed)
const 504 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
507 template<
class T,
class F>
511 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
517 typedef decltype((*(F*)
nullptr)()) source_type;
523 auto start(F createSource,
long created,
long subscribed,
long unsubscribed) const
524 -> typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
526 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
530 auto start(F createSource,
long unsubscribed)
const 531 ->
typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
533 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed);
538 ->
typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
540 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
549 return tester->make_subscriber<T>();
553 clock_type::time_point
now()
const {
554 return tester->now();
558 return test_worker(cs, tester->create_test_type_worker_interface());
562 long clock()
const {
return tester->clock();}
565 return tester->to_time_point(absolute);
570 return tester->make_hot_observable(std::move(
messages));
573 template<
class T, std::
size_t size>
575 -> decltype(tester->make_hot_observable(std::vector<T>())) {
581 -> decltype(tester->make_hot_observable(std::vector<T>())) {
582 return tester->make_hot_observable(std::vector<T>(il));
587 return tester->make_cold_observable(std::move(
messages));
590 template<
class T, std::
size_t size>
592 -> decltype(tester->make_cold_observable(std::vector<T>())) {
598 -> decltype(tester->make_cold_observable(std::vector<T>())) {
599 return tester->make_cold_observable(std::vector<T>(il));
605 return test(std::make_shared<detail::test_type>());
Definition: rx-notification.hpp:253
Definition: rx-scheduler.hpp:163
static recorded_type completed(long ticks)
Definition: rx-test.hpp:393
auto start(F createSource, long unsubscribed) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:530
detail::test_type::clock_type clock_type
Definition: rx-test.hpp:373
clock_type::time_point to_time_point(long absolute) const
Definition: rx-test.hpp:564
void schedule_absolute(long when, const schedulable &a) const
Definition: rx-test.hpp:424
auto make_cold_observable(const T(&arr)[size]) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:591
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
auto start(F createSource) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:537
~test_worker()
Definition: rx-test.hpp:412
long clock() const
Definition: rx-test.hpp:562
test(std::shared_ptr< detail::test_type > t)
Definition: rx-test.hpp:367
static recorded_type next(long ticks, U value)
Definition: rx-test.hpp:389
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
auto start(F createSource, long created, long subscribed, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:466
static rxn::subscription subscribe(long subscribe, long unsubscribe)
Definition: rx-test.hpp:402
rxn::subscription subscription_type
Definition: rx-test.hpp:384
Definition: rx-notification.hpp:116
rxn::notification< T > notification_type
Definition: rx-test.hpp:382
Definition: rx-subscription.hpp:31
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
rxt::testable_observable< T > make_cold_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:586
source_type::value_type value_type
Definition: rx-test.hpp:518
Definition: rx-predef.hpp:58
void advance_by(long time) const
Definition: rx-test.hpp:455
clock_type::time_point now() const
Definition: rx-test.hpp:553
subscriber< T, rxt::testable_observer< T > > make_subscriber() const
Definition: rx-test.hpp:548
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
Definition: rx-test.hpp:515
auto make_hot_observable(std::initializer_list< T > il) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:580
bool is_enabled() const
Definition: rx-test.hpp:421
Definition: rx-notification.hpp:14
Definition: rx-test.hpp:407
void sleep(long time) const
Definition: rx-test.hpp:460
static recorded_type error(long ticks, Exception &&e)
Definition: rx-test.hpp:398
auto schedule_absolute(long when, Arg0 &&a0, ArgN &&...an) const -> typename std::enable_if< (detail::is_action_function< Arg0 >::value|| is_subscription< Arg0 >::value)&& !is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:433
auto start(F &&createSource) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:508
auto schedule_relative(long when, Arg0 &&a0, ArgN &&...an) const -> typename std::enable_if< (detail::is_action_function< Arg0 >::value|| is_subscription< Arg0 >::value)&& !is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:442
void schedule_relative(long when, const schedulable &a) const
Definition: rx-test.hpp:428
auto make_cold_observable(std::initializer_list< T > il) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:597
auto start(F &&createSource, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:501
test_worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-test.hpp:557
rxn::recorded< typename notification_type::type > recorded_type
Definition: rx-test.hpp:383
void start() const
Definition: rx-test.hpp:543
Definition: rx-scheduler.hpp:353
long clock() const
Definition: rx-test.hpp:422
bool is_enabled() const
Definition: rx-test.hpp:561
identity_one_worker identity_test()
Definition: rx-test.hpp:610
messages()
Definition: rx-test.hpp:386
a source of values that records the time of each subscription/unsubscription and all the values and t...
Definition: rx-test.hpp:83
rxt::testable_observable< T > make_hot_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:569
Definition: rx-coordination.hpp:114
Definition: rx-test.hpp:362
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
virtual void schedule_absolute(typename base::absolute when, const schedulable &a) const
Definition: rx-virtualtime.hpp:206
test_worker(composite_subscription cs, std::shared_ptr< detail::test_type::test_type_worker > t)
Definition: rx-test.hpp:415
Definition: rx-test.hpp:380
test make_test()
Definition: rx-test.hpp:604
Definition: rx-scheduler.hpp:426
std::vector< T > to_vector(const T(&arr)[size])
Definition: rx-util.hpp:40
Definition: rx-test.hpp:53
auto make_hot_observable(const T(&arr)[size]) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:574
auto subscribe(ArgN &&...an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
void advance_to(long time) const
Definition: rx-test.hpp:450
Definition: rx-scheduler.hpp:200