T
- the value type consumedU
- the subclass of this BaseTestConsumerpublic abstract class BaseTestConsumer<T,U extends BaseTestConsumer<T,U>> extends Object implements Disposable
Modifier and Type | Class and Description |
---|---|
static class |
BaseTestConsumer.TestWaitStrategy
Enumeration of default wait strategies when waiting for a specific number of
items in
awaitCount(int, Runnable) . |
Modifier and Type | Field and Description |
---|---|
protected boolean |
checkSubscriptionOnce |
protected long |
completions
The number of completions.
|
protected CountDownLatch |
done
The latch that indicates an onError or onComplete has been called.
|
protected List<Throwable> |
errors
The list of errors received.
|
protected int |
establishedFusionMode |
protected int |
initialFusionMode |
protected Thread |
lastThread
The last thread seen by the observer.
|
protected CharSequence |
tag
The optional tag associated with this test consumer.
|
protected boolean |
timeout
Indicates that one of the awaitX method has timed out.
|
protected List<T> |
values
The list of values received.
|
Constructor and Description |
---|
BaseTestConsumer() |
Modifier and Type | Method and Description |
---|---|
U |
assertComplete()
Assert that this TestObserver/TestSubscriber received exactly one onComplete event.
|
U |
assertEmpty()
Assert that the TestObserver/TestSubscriber has received a Disposable but no other events.
|
U |
assertError(Class<? extends Throwable> errorClass)
Asserts that this TestObserver/TestSubscriber received exactly one onError event which is an
instance of the specified errorClass class.
|
U |
assertError(Predicate<Throwable> errorPredicate)
Asserts that this TestObserver/TestSubscriber received exactly one onError event for which
the provided predicate returns true.
|
U |
assertError(Throwable error)
Assert that this TestObserver/TestSubscriber received exactly the specified onError event value.
|
U |
assertErrorMessage(String message)
Assert that there is a single error and it has the given message.
|
U |
assertFailure(Class<? extends Throwable> error,
T... values)
Assert that the upstream signalled the specified values in order
and then failed with a specific class or subclass of Throwable.
|
U |
assertFailure(Predicate<Throwable> errorPredicate,
T... values)
Assert that the upstream signalled the specified values in order and then failed
with a Throwable for which the provided predicate returns true.
|
U |
assertFailureAndMessage(Class<? extends Throwable> error,
String message,
T... values)
Assert that the upstream signalled the specified values in order,
then failed with a specific class or subclass of Throwable
and with the given exact error message.
|
U |
assertNever(Predicate<? super T> valuePredicate)
Asserts that this TestObserver/TestSubscriber did not receive any onNext value for which
the provided predicate returns true.
|
U |
assertNever(T value)
Assert that this TestObserver/TestSubscriber did not receive an onNext value which is equal to
the given value with respect to null-safe Object.equals.
|
U |
assertNoErrors()
Assert that this TestObserver/TestSubscriber has not received any onError event.
|
U |
assertNotComplete()
Assert that this TestObserver/TestSubscriber has not received any onComplete event.
|
U |
assertNoTimeout()
Asserts that some awaitX method has not timed out.
|
abstract U |
assertNotSubscribed()
Assert that the onSubscribe method hasn't been called at all.
|
U |
assertNotTerminated()
Assert that the TestObserver/TestSubscriber has not terminated (i.e., the terminal latch is still non-zero).
|
U |
assertNoValues()
Assert that this TestObserver/TestSubscriber has not received any onNext events.
|
U |
assertResult(T... values)
Assert that the upstream signalled the specified values in order and
completed normally.
|
abstract U |
assertSubscribed()
Assert that the onSubscribe method was called exactly once.
|
U |
assertTerminated()
Assert that the TestObserver/TestSubscriber terminated (i.e., the terminal latch reached zero).
|
U |
assertTimeout()
Asserts that some awaitX method has timed out.
|
U |
assertValue(Predicate<T> valuePredicate)
Asserts that this TestObserver/TestSubscriber received exactly one onNext value for which
the provided predicate returns true.
|
U |
assertValue(T value)
Assert that this TestObserver/TestSubscriber received exactly one onNext value which is equal to
the given value with respect to Objects.equals.
|
U |
assertValueAt(int index,
Predicate<T> valuePredicate)
Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
for the provided predicate returns true.
|
U |
assertValueAt(int index,
T value)
Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
which is equal to the given value with respect to null-safe Object.equals.
|
U |
assertValueCount(int count)
Assert that this TestObserver/TestSubscriber received the specified number onNext events.
|
U |
assertValues(T... values)
Assert that the TestObserver/TestSubscriber received only the specified values in the specified order.
|
U |
assertValueSequence(Iterable<? extends T> sequence)
Assert that the TestObserver/TestSubscriber received only the specified sequence of values in the same order.
|
U |
assertValueSequenceOnly(Iterable<? extends T> sequence)
Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
|
U |
assertValueSet(Collection<? extends T> expected)
Assert that the TestObserver/TestSubscriber received only items that are in the specified
collection as well, irrespective of the order they were received.
|
U |
assertValueSetOnly(Collection<? extends T> expected)
Assert that the TestObserver/TestSubscriber received only the specified values in any order without terminating.
|
U |
assertValuesOnly(T... values)
Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
|
U |
await()
Awaits until this TestObserver/TestSubscriber receives an onError or onComplete events.
|
boolean |
await(long time,
TimeUnit unit)
Awaits the specified amount of time or until this TestObserver/TestSubscriber
receives an onError or onComplete events, whichever happens first.
|
U |
awaitCount(int atLeast)
Await until the TestObserver/TestSubscriber receives the given
number of items or terminates by sleeping 10 milliseconds at a time
up to 5000 milliseconds of timeout.
|
U |
awaitCount(int atLeast,
Runnable waitStrategy)
Await until the TestObserver/TestSubscriber receives the given
number of items or terminates by waiting according to the wait
strategy and up to 5000 milliseconds of timeout.
|
U |
awaitCount(int atLeast,
Runnable waitStrategy,
long timeoutMillis)
Await until the TestObserver/TestSubscriber receives the given
number of items or terminates.
|
U |
awaitDone(long time,
TimeUnit unit)
Awaits until the internal latch is counted down.
|
boolean |
awaitTerminalEvent()
Waits until the any terminal event has been received by this TestObserver/TestSubscriber
or returns false if the wait has been interrupted.
|
boolean |
awaitTerminalEvent(long duration,
TimeUnit unit)
Awaits the specified amount of time or until this TestObserver/TestSubscriber
receives an onError or onComplete events, whichever happens first.
|
U |
clearTimeout()
Clears the timeout flag set by the await methods when they timed out.
|
long |
completions()
Returns the number of times onComplete was called.
|
int |
errorCount()
Returns the number of onError exceptions received.
|
List<Throwable> |
errors()
Returns a shared list of received onError exceptions.
|
protected AssertionError |
fail(String message)
Fail with the given message and add the sequence of errors as suppressed ones.
|
List<List<Object>> |
getEvents()
Returns a list of 3 other lists: the first inner list contains the plain
values received; the second list contains the potential errors
and the final list contains the potential completions as Notifications.
|
boolean |
isTerminated()
Returns true if TestObserver/TestSubscriber received any onError or onComplete events.
|
boolean |
isTimeout()
Returns true if an await timed out.
|
Thread |
lastThread()
Returns the last thread which called the onXXX methods of this TestObserver/TestSubscriber.
|
static String |
valueAndClass(Object o)
Appends the class name to a non-null value.
|
int |
valueCount()
Returns the number of onNext values received.
|
List<T> |
values()
Returns a shared list of received onNext values.
|
U |
withTag(CharSequence tag)
Set the tag displayed along with an assertion failure's
other state information.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
dispose, isDisposed
protected final CountDownLatch done
protected long completions
protected Thread lastThread
protected boolean checkSubscriptionOnce
protected int initialFusionMode
protected int establishedFusionMode
protected CharSequence tag
protected boolean timeout
public final Thread lastThread()
public final List<T> values()
Note that accessing the items via certain methods of the List
interface while the upstream is still actively emitting
more items may result in a ConcurrentModificationException
.
The List.size()
method will return the number of items
already received by this TestObserver/TestSubscriber in a thread-safe
manner that can be read via List.get(int)
) method
(index range of 0 to List.size() - 1
).
A view of the returned List can be created via List.subList(int, int)
by using the bounds 0 (inclusive) to List.size()
(exclusive) which,
when accessed in a read-only fashion, should be also thread-safe and not throw any
ConcurrentModificationException
.
public final List<Throwable> errors()
Note that accessing the errors via certain methods of the List
interface while the upstream is still actively emitting
more items or errors may result in a ConcurrentModificationException
.
The List.size()
method will return the number of errors
already received by this TestObserver/TestSubscriber in a thread-safe
manner that can be read via List.get(int)
) method
(index range of 0 to List.size() - 1
).
A view of the returned List can be created via List.subList(int, int)
by using the bounds 0 (inclusive) to List.size()
(exclusive) which,
when accessed in a read-only fashion, should be also thread-safe and not throw any
ConcurrentModificationException
.
public final long completions()
public final boolean isTerminated()
public final int valueCount()
public final int errorCount()
protected final AssertionError fail(String message)
Note this is deliberately the only fail method. Most of the times an assertion would fail but it is possible it was due to an exception somewhere. This construct will capture those potential errors and report it along with the original failure.
message
- the message to usepublic final U await() throws InterruptedException
InterruptedException
- if the current thread is interrupted while waitingawaitTerminalEvent()
public final boolean await(long time, TimeUnit unit) throws InterruptedException
time
- the waiting timeunit
- the time unit of the waiting timeInterruptedException
- if the current thread is interrupted while waitingawaitTerminalEvent(long, TimeUnit)
public final U assertComplete()
public final U assertNotComplete()
public final U assertNoErrors()
public final U assertError(Throwable error)
The comparison is performed via Objects.equals(); since most exceptions don't
implement equals(), this assertion may fail. Use the assertError(Class)
overload to test against the class of an error instead of an instance of an error
or assertError(Predicate)
to test with different condition.
error
- the error to checkassertError(Class)
,
assertError(Predicate)
public final U assertError(Class<? extends Throwable> errorClass)
errorClass
- the error class to expectpublic final U assertError(Predicate<Throwable> errorPredicate)
errorPredicate
- the predicate that receives the error Throwable
and should return true for expected errors.public final U assertValue(T value)
value
- the value to expectpublic final U assertNever(T value)
History: 2.0.5 - experimental
value
- the value to expect not being receivedpublic final U assertValue(Predicate<T> valuePredicate)
valuePredicate
- the predicate that receives the onNext value
and should return true for the expected value.public final U assertNever(Predicate<? super T> valuePredicate)
History: 2.0.5 - experimental
valuePredicate
- the predicate that receives the onNext value
and should return true for the expected value.public final U assertValueAt(int index, T value)
History: 2.1.3 - experimental
index
- the position to assert onvalue
- the value to expectpublic final U assertValueAt(int index, Predicate<T> valuePredicate)
index
- the position to assert onvaluePredicate
- the predicate that receives the onNext value
and should return true for the expected value.public static String valueAndClass(Object o)
o
- the objectpublic final U assertValueCount(int count)
count
- the expected number of onNext eventspublic final U assertNoValues()
public final U assertValues(T... values)
values
- the values expectedassertValueSet(Collection)
public final U assertValuesOnly(T... values)
History: 2.1.4 - experimental
values
- the values expectedpublic final U assertValueSet(Collection<? extends T> expected)
This helps asserting when the order of the values is not guaranteed, i.e., when merging asynchronous streams.
To ensure that only the expected items have been received, no more and no less, in any order,
apply assertValueCount(int)
with expected.size()
.
expected
- the collection of values expected in any orderpublic final U assertValueSetOnly(Collection<? extends T> expected)
History: 2.1.14 - experimental
expected
- the collection of values expected in any orderpublic final U assertValueSequence(Iterable<? extends T> sequence)
sequence
- the sequence of expected values in orderpublic final U assertValueSequenceOnly(Iterable<? extends T> sequence)
History: 2.1.14 - experimental
sequence
- the sequence of expected values in orderpublic final U assertTerminated()
public final U assertNotTerminated()
public final boolean awaitTerminalEvent()
public final boolean awaitTerminalEvent(long duration, TimeUnit unit)
duration
- the waiting timeunit
- the time unit of the waiting timepublic final U assertErrorMessage(String message)
message
- the message expectedpublic final List<List<Object>> getEvents()
public abstract U assertSubscribed()
public abstract U assertNotSubscribed()
public final U assertResult(T... values)
values
- the expected values, asserted in orderassertFailure(Class, Object...)
,
assertFailure(Predicate, Object...)
,
assertFailureAndMessage(Class, String, Object...)
public final U assertFailure(Class<? extends Throwable> error, T... values)
error
- the expected exception (parent) classvalues
- the expected values, asserted in orderpublic final U assertFailure(Predicate<Throwable> errorPredicate, T... values)
errorPredicate
- the predicate that receives the error Throwable
and should return true for expected errors.values
- the expected values, asserted in orderpublic final U assertFailureAndMessage(Class<? extends Throwable> error, String message, T... values)
error
- the expected exception (parent) classmessage
- the expected failure messagevalues
- the expected values, asserted in orderpublic final U awaitDone(long time, TimeUnit unit)
If the wait times out or gets interrupted, the TestObserver/TestSubscriber is cancelled.
time
- the waiting timeunit
- the time unit of the waiting timeRuntimeException
- wrapping an InterruptedException if the wait is interruptedpublic final U assertEmpty()
public final U withTag(CharSequence tag)
History: 2.0.7 - experimental
tag
- the string to display (null won't print any tag)public final U awaitCount(int atLeast)
History: 2.0.7 - experimental
atLeast
- the number of items expected at leastawaitCount(int, Runnable, long)
public final U awaitCount(int atLeast, Runnable waitStrategy)
History: 2.0.7 - experimental
atLeast
- the number of items expected at leastwaitStrategy
- a Runnable called when the current received count
hasn't reached the expected value and there was
no terminal event either, see BaseTestConsumer.TestWaitStrategy
for examplesawaitCount(int, Runnable, long)
public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis)
History: 2.0.7 - experimental
atLeast
- the number of items expected at leastwaitStrategy
- a Runnable called when the current received count
hasn't reached the expected value and there was
no terminal event either, see BaseTestConsumer.TestWaitStrategy
for examplestimeoutMillis
- if positive, the await ends if the specified amount of
time has passed no matter how many items were receivedpublic final boolean isTimeout()
History: 2.0.7 - experimental
clearTimeout()
,
assertTimeout()
,
assertNoTimeout()
public final U clearTimeout()
History: 2.0.7 - experimental
isTimeout()
public final U assertTimeout()
History: 2.0.7 - experimental
public final U assertNoTimeout()
History: 2.0.7 - experimental