Java源码示例:io.reactivex.rxjava3.core.Scheduler
示例1
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
示例2
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
示例3
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
* the stream as an {@link Observable} every time it receives an effect from the upstream effects
* observable. This will result in calling the function on the specified scheduler, and passing it
* the requested effect object then emitting its returned value.
*
* @param function the {@link Function} to be invoked every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the function
* @param <F> the type of Effect this transformer handles
* @param <E> the type of Event this transformer emits
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromFunction(
final Function<F, E> function, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream.flatMap(
new Function<F, ObservableSource<E>>() {
@Override
public ObservableSource<E> apply(@NonNull F f) {
Observable<E> eventObservable =
Observable.fromSupplier(
new Supplier<E>() {
@Override
public E get() throws Throwable {
return function.apply(f);
}
});
return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
}
});
}
};
}
示例4
SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize > 0 required but it was " + maxSize);
}
if (maxAge <= 0) {
throw new IllegalArgumentException("maxAge > 0 required but it was " + maxAge);
}
if (unit == null) throw new NullPointerException("unit == null");
if (scheduler == null) throw new NullPointerException("scheduler == null");
this.maxSize = maxSize;
this.maxAge = maxAge;
this.unit = unit;
this.scheduler = scheduler;
TimedNode<T> h = new TimedNode<T>(null, 0L);
this.tail = h;
this.head = h;
}
示例5
SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize > 0 required but it was " + maxSize);
}
if (maxAge <= 0) {
throw new IllegalArgumentException("maxAge > 0 required but it was " + maxAge);
}
if (unit == null) throw new NullPointerException("unit == null");
if (scheduler == null) throw new NullPointerException("scheduler == null");
this.maxSize = maxSize;
this.maxAge = maxAge;
this.unit = unit;
this.scheduler = scheduler;
TimedNode<T> h = new TimedNode<T>(null, 0L);
this.tail = h;
this.head = h;
}
示例6
/**
* Returns a function which wraps the supplied {@link Scheduler} in one which notifies Espresso as
* to whether it is currently executing work or not.
* <p>
* Note: Work scheduled in the future does not mark the idling resource as busy.
*/
@SuppressWarnings("ConstantConditions") // Public API guarding.
@CheckResult @NonNull
public static Function<Supplier<Scheduler>, Scheduler> create(@NonNull final String name) {
if (name == null) throw new NullPointerException("name == null");
return delegate -> {
IdlingResourceScheduler scheduler =
new DelegatingIdlingResourceScheduler(delegate.get(), name);
IdlingRegistry.getInstance().register(scheduler);
return scheduler;
};
}
示例7
/**
* Wraps the supplied {@link Scheduler} into one which also implements {@link IdlingResource}.
* You must {@linkplain IdlingRegistry#register(IdlingResource...) register} the
* returned instance with Espresso before it will be used. Only work scheduled on the returned
* instance directly will be registered.
*/
@SuppressWarnings("ConstantConditions") // Public API guarding.
@CheckResult @NonNull
public static IdlingResourceScheduler wrap(@NonNull Scheduler scheduler, @NonNull String name) {
if (scheduler == null) throw new NullPointerException("scheduler == null");
if (name == null) throw new NullPointerException("name == null");
return new DelegatingIdlingResourceScheduler(scheduler, name);
}
示例8
@Test public void betweenPeriodicSchedulesReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
CountingRunnable action = new CountingRunnable();
worker.schedulePeriodically(action, 0, 1, SECONDS);
delegate.triggerActions();
assertEquals(1, action.count());
delegate.advanceTimeBy(500, MILLISECONDS);
assertIdle(1);
delegate.advanceTimeBy(1000, MILLISECONDS);
assertIdle(2);
}
示例9
@Test public void unsubscribingScheduledWorkWhileRunningWorkReportsBusy() {
final Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
worker.dispose();
assertBusy();
});
delegate.triggerActions();
}
示例10
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
示例11
public void start() {
// WARNING: this code doesn't work as expected
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
.subscribeOn(scheduler))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
示例12
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
示例13
void start() {
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(m)
.map(messageHandler::handleMessage))
.subscribeOn(scheduler))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
示例14
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize),
new WaitForCapacityPolicy()
));
}
示例15
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
示例16
public SchedulerWorkRunner(@NonNull Scheduler scheduler) {
this.worker = checkNotNull(scheduler).createWorker();
}
示例17
@Nonnull
@CheckReturnValue
default Scheduler rxScheduler() {
return options().rxScheduler();
}
示例18
DelegatingIdlingResourceScheduler(Scheduler delegate, String name) {
this.delegate = delegate;
this.name = name;
}
示例19
@Test public void scheduledWorkReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable());
assertBusy();
}
示例20
@Test public void scheduledWorkUnsubscribedReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable()).dispose();
assertIdle(1);
}
示例21
@Test public void scheduleWithZeroDelayReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable(), 0, SECONDS);
assertBusy();
}
示例22
@Test public void scheduleWithNonZeroDelayReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable(), 1, SECONDS);
assertIdle(0);
}
示例23
@Test public void schedulePeriodicallyWithZeroDelayReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedulePeriodically(new CountingRunnable(), 0, 1, SECONDS);
assertBusy();
}
示例24
@Test public void schedulePeriodicallyWithNonZeroDelayReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedulePeriodically(new CountingRunnable(), 1, 1, SECONDS);
assertIdle(0);
}
示例25
@Test public void runningWorkReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(this::assertBusy);
delegate.triggerActions();
}
示例26
@Test public void unsubscribingScheduledWorksReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable());
worker.dispose();
assertIdle(1);
}
示例27
@Test public void scheduleWorkAfterUnsubscribedReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.dispose();
worker.schedule(new CountingRunnable());
assertIdle(0);
}
示例28
@Test public void finishingWorkWithoutRegisteredCallbackDoesNotCrash() {
IdlingResourceScheduler scheduler = Rx3Idler.wrap(delegate, "Bob");
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable());
delegate.triggerActions();
}
示例29
@Test
@Ignore("OOMs")
public void testEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorRelay<Object> rs = BehaviorRelay.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}
示例30
@Test
public void testReplayRelayEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplayRelay<Object> rs = ReplayRelay.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
worker.schedule(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.accept(1);
}
});
final AtomicReference<Object> o = new AtomicReference<Object>();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onComplete() {
o.set(-1);
finish.countDown();
}
@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}
@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}
});
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
}
}
} finally {
worker.dispose();
}
}