Java源码示例:io.reactivex.functions.Function3
示例1
public FlowableStateMachine(Flowable<In> source, //
Callable<? extends State> initialState, //
Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
BackpressureStrategy backpressureStrategy, //
int requestBatchSize) {
Preconditions.checkNotNull(initialState);
Preconditions.checkNotNull(transition);
Preconditions.checkNotNull(backpressureStrategy);
Preconditions.checkArgument(requestBatchSize > 0,
"initialRequest must be greater than zero");
this.source = source;
this.initialState = initialState;
this.transition = transition;
this.completionAction = completionAction;
this.errorAction = errorAction;
this.backpressureStrategy = backpressureStrategy;
this.requestBatchSize = requestBatchSize;
}
示例2
StateMachineSubscriber( //
Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
BackpressureStrategy backpressureStrategy, //
int requestBatchSize, //
Subscriber<? super Out> child) {
this.initialState = initialState;
this.transition = transition;
this.completionAction = completionAction;
this.errorAction = errorAction;
this.backpressureStrategy = backpressureStrategy;
this.requestBatchSize = requestBatchSize;
this.child = child;
this.count = requestBatchSize;
}
示例3
public static void main(String[] args) {
// Create an await for the API
//Promise<ApiPromise> ready = ApiPromise.create();
initEndPoint(args);
WsProvider wsProvider = new WsProvider(endPoint);
Observable<ApiRx> apiRxObservable = ApiRx.create(wsProvider);
apiRxObservable.flatMap((apiRx) -> {
return (Observable<String[]>) Observable.zip(
apiRx.rpc().system().function("chain").invoke(),
apiRx.rpc().system().function("name").invoke(),
apiRx.rpc().system().function("version").invoke(),
new Function3<Object, Object, Object, String[]>() {
@Override
public String[] apply(Object o, Object o2, Object o3) throws Exception {
String[] msg = new String[]{o.toString(), o2.toString(), o3.toString()};
return msg;
}
}
);
}).subscribe((result) -> {
String[] infos = (String[])result;
System.out.println("You are connected to chain [" + infos[0] + "] using [" + infos[1] + "] v[" + infos[2] + "]");
});
}
示例4
public static void main(String[] args) {
// Create an await for the API
//Promise<ApiPromise> ready = ApiPromise.create();
initEndPoint(args);
WsProvider wsProvider = new WsProvider(endPoint);
Observable<ApiRx> apiRxObservable = ApiRx.create(wsProvider);
apiRxObservable.flatMap((apiRx) -> {
return (Observable<String[]>) Observable.zip(
apiRx.rpc().system().function("chain").invoke(),
apiRx.rpc().system().function("name").invoke(),
apiRx.rpc().system().function("version").invoke(),
new Function3<Object, Object, Object, String[]>() {
@Override
public String[] apply(Object o, Object o2, Object o3) throws Exception {
String[] msg = new String[]{o.toString(), o2.toString(), o3.toString()};
return msg;
}
}
);
}).subscribe((String[] result) -> {
System.out.println("You are connected to chain [" + result[0] + "] using [" + result[1] + "] v[" + result[2] + "]");
});
}
示例5
/**
* 获取首页banner、置顶文章、列表文章
* @param page
* @param function3
* @param rxObserver
*/
@Override
public void getHomeData(int page, Function3<BaseBean<List<Banner>>, BaseBean<List<Article>>, BaseBean<PageListData<Article>>, HomeData> function3, DisposableObserver<HomeData> rxObserver) {
Observable<BaseBean<List<Banner>>> bannerObservable = getBannerObservable();
Observable<BaseBean<List<Article>>> homeTopObservable = getHomeTopObservable();
Observable<BaseBean<PageListData<Article>>> homeObservable = getHomeListObservable(page);
Observable.zip(bannerObservable, homeTopObservable, homeObservable, function3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rxObserver);
}
示例6
public static <T> Single<DetailBean<T>> toCommentDetail(Single<T> detailSingle,
Single<List<CommentBean>> bestCommentsSingle,
Single<List<CommentBean>> commentsSingle){
return Single.zip(detailSingle, bestCommentsSingle, commentsSingle,
new Function3<T, List<CommentBean>, List<CommentBean>, DetailBean<T>>() {
@Override
public DetailBean<T> apply(T t, List<CommentBean> commentBeen,
List<CommentBean> commentBeen2) throws Exception {
return new DetailBean<T>(t,commentBeen,commentBeen2);
}
});
}
示例7
private TransformerStateMachine(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
Preconditions.checkNotNull(initialState);
Preconditions.checkNotNull(transition);
Preconditions.checkNotNull(completion);
Preconditions.checkNotNull(backpressureStrategy);
Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero");
this.initialState = initialState;
this.transition = transition;
this.completion = completion;
this.backpressureStrategy = backpressureStrategy;
this.requestBatchSize = requestBatchSize;
}
示例8
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
requestBatchSize);
}
示例9
private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute(
final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state,
final BackpressureStrategy backpressureStrategy) {
return new Function<Notification<In>, Flowable<Notification<Out>>>() {
@Override
public Flowable<Notification<Out>> apply(final Notification<In> in) {
return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() {
@Override
public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception {
FlowableEmitter<Out> w = wrap(emitter);
if (in.isOnNext()) {
state.value = transition.apply(state.value, in.getValue(), w);
if (!emitter.isCancelled())
emitter.onComplete();
else {
// this is a special emission to indicate that
// the transition called unsubscribe. It will be
// filtered later.
emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
}
} else if (in.isOnComplete()) {
if (completion.test(state.value, w) && !emitter.isCancelled()) {
w.onComplete();
}
} else if (!emitter.isCancelled()) {
w.onError(in.getError());
}
}
}, backpressureStrategy);
}
};
}
示例10
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
requestBatchSize);
}
示例11
public void onZipRequest(View view) {
//使用zip操作符合并等待多个网络请求完成后,再刷新界面
//例如下面:数据来自3个不同的接口
Observable<ResultBean> mobileObservable = EasyHttp.get("http://apis.juhe.cn/mobile/get")
.params("phone", "18688994275")
.params("dtype", "json")
.params("key", "5682c1f44a7f486e40f9720d6c97ffe4")
.execute(new CallClazzProxy<TestApiResult1<ResultBean>, ResultBean>(ResultBean.class) {
});
Observable<Content> searchObservable = EasyHttp.get("/ajax.php")
.baseUrl("http://fy.iciba.com")
.params("a", "fy")
.params("f", "auto")
.params("t", "auto")
.params("w", "hello world")
//采用代理
.execute(new CallClazzProxy<TestApiResult6<Content>, Content>(Content.class) {
});
Observable<List<SectionItem>> listObservable = EasyHttp.get("http://news-at.zhihu.com/api/3/sections")
.execute(new CallClazzProxy<TestApiResult5<List<SectionItem>>, List<SectionItem>>(new TypeToken<List<SectionItem>>() {
}.getType()) {
});
//new Function3最后一个参数这里用的是List<Object>,表示将3个返回的结果,放在同一个集合最终一次性返回,你也可以指定返回其它你需要的数据类型并不一定是List<Object>
//假如这三个接口返回的都是TestBean,那么就可以直接用具体的List<TestBean>,不需要用List<Object>
Observable.zip(mobileObservable, searchObservable, listObservable, new Function3<ResultBean, Content, List<SectionItem>, List<Object>>() {
@Override
public List<Object> apply(@NonNull ResultBean resultbean, @NonNull Content content, @NonNull List<SectionItem> sectionItems) throws Exception {
//将接收到的3个数据先暂存起来,一次性发给订阅者
List list = new ArrayList();
list.add(resultbean);
list.add(content);
list.add(sectionItems);
return list;
}
}).subscribe(new BaseSubscriber<List<Object>>() {
@Override
public void onError(ApiException e) {
showToast(e.getMessage());
}
@Override
public void onNext(@NonNull List<Object> objects) {
showToast(objects.toString());
}
});
}
示例12
private Function3<ChanCatalog, List<HiddenThread>, List<Filter>, ChanCatalog> hideThreads() {
return (chanCatalog, hiddenThreads, filters) -> {
if (chanCatalog != null) {
List<ChanPost> posts = new ArrayList<>();
for (ChanPost post : chanCatalog.getPosts()) {
boolean found = false;
for (HiddenThread hiddenThread : hiddenThreads) {
if (hiddenThread.threadId == post.getNo()) {
found = true;
break;
}
}
if (filters != null && filters.size() > 0 && !found) {
for (Filter filter : filters) {
if (found) {
break;
}
Pattern filterPattern = Pattern.compile(filter.filter, Pattern.CASE_INSENSITIVE);
Matcher matcher;
if (post.getCom() != null) {
matcher = filterPattern.matcher(post.getCom());
found = matcher.find();
}
if (post.getSub() != null && !found) {
matcher = filterPattern.matcher(post.getSub());
found = matcher.find();
}
if (post.getName() != null && !found) {
matcher = filterPattern.matcher(post.getName());
found = matcher.find();
}
}
}
if (!found) {
posts.add(post);
}
}
chanCatalog.setPosts(posts);
}
return chanCatalog;
};
}
示例13
/**
* 获取首页banner、置顶文章、列表文章
* @param page 页码
* @param function3
* @param rxObserver
*/
void getHomeData(int page, Function3<BaseBean<List<Banner>>, BaseBean<List<Article>>, BaseBean<PageListData<Article>>, HomeData> function3, DisposableObserver<HomeData> rxObserver);