中文文档 :https://www.kancloud.cn/luponu/rxjava_zh/974452
Schedulers.computation()
用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.single() 该调度器的线程池只能同时执行一个线程
Schedulers.io()
用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;
对于普通的计算任务,请使用Schedulers.computation();
默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread() 为每个任务创建一个新线程
Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行。
AndroidSchedulers.mainThread() 主线程,UI线程,可以用于更新界面
static <T> Observable<T> create(ObservableOnSubscribe<T> source)
可以创建任意类型的参数
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 发射
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext("下发数据");
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.subscribe(
// 下游 Observer 观察者
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(String s) {
Log.d(TAG, "下游接收 onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 下发数据
2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
可以发射单一数据或者数集
Observable.just("one", "two", "three", "four", "five").subscribe(
// 下游 Observer 观察者
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(String s) {
Log.d(TAG, "下游接收 onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: one
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: two
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: three
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: four
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: five
2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete

发射数集
跟 just 一样,just 内部调用 fromArray
直接发送 onComplete() 事件
Observable.empty().subscribe(
// 下游 Observer 观察者
new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(Object s) {
Log.d(TAG, "下游接收 onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "下游接收异常 onError:"+e.getMessage());
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-22 16:19:57.417 20127-20127/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-22 16:19:57.417 20127-20127/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
不发送任何事件
发送 onError() 事件
发射指定范围的整数序列
Observable.range(1, 7).subscribe(
// 下游 Observer 观察者
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "下游接收 onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 1
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 2
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 3
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 4
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 5
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 6
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 7
2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
确保Observable代码在被订阅后才执行,而不是创建后立即执行
Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("发射上游事件");
observer.onNext("确保Observable代码在被订阅后才执行,而不是创建后立即执行");
observer.onComplete();
}
};
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
多少秒后调用
Observable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: " + aLong);
}
});
2022-08-25 15:35:02.126 13208-13208/com.yoshin.tspsdk I/RxJavaActivity: onCreate: ================
2022-08-25 15:35:03.132 13208-13431/com.yoshin.tspsdk I/RxJavaActivity: accept: 0

按照给定的时间间隔发射整数序列的Observable
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: " + aLong);
}
});
2022-08-25 15:40:30.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 0
2022-08-25 15:40:31.020 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 1
2022-08-25 15:40:32.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 2
2022-08-25 15:40:33.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 3
2022-08-25 15:40:34.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 4
2022-08-25 15:40:35.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 5
Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: " + aLong);
}
});
延迟0毫秒,从1开始发射,发射5次,时间间隔 1000毫秒
2022-08-25 15:45:09.665 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 1
2022-08-25 15:45:10.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 2
2022-08-25 15:45:11.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 3
2022-08-25 15:45:12.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 4
2022-08-25 15:45:13.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 5
Observable
.create(new ObservableOnSubscribe<List<ApiData>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiData>> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
List<ApiData> apiDataList = new ArrayList<>();
apiDataList.add(new ApiData("小明", 8, 4));
apiDataList.add(new ApiData("小黑", 7, 3));
emitter.onNext(apiDataList);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.map(new Function<List<ApiData>, List<Student>>() {
@Override
public List<Student> apply(List<ApiData> dataList) throws Exception {
Log.d(TAG, "中游");
if (dataList != null && dataList.size() > 0) {
List<Student> studentList = new ArrayList<>();
for (ApiData apiData : dataList) {
studentList.add(new Student(apiData.getName(), apiData.getAge()));
}
return studentList;
} else {
return null;
}
}
})
.subscribe(
// 下游 Observer 观察者
new Observer<List<Student>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(List<Student> list) {
Log.d(TAG, "下游接收 onNext: " + list);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: [com.yoshin.tspsdk.rx.bean.Student@6ceb616, com.yoshin.tspsdk.rx.bean.Student@c826b97]
2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成

Observable
.create(new ObservableOnSubscribe<List<ApiData>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiData>> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
List<ApiData> apiDataList = new ArrayList<>();
apiDataList.add(new ApiData("小明", 8, 4));
apiDataList.add(new ApiData("小黑", 7, 3));
emitter.onNext(apiDataList);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.flatMap(new Function<List<ApiData>, ObservableSource<List<Student>>>() {
@Override
public ObservableSource<List<Student>> apply(List<ApiData> dataList) throws Exception {
Log.d(TAG, "中游");
List<Student> studentList = new ArrayList<>();
if (dataList != null && dataList.size() > 0) {
for (ApiData apiData : dataList) {
studentList.add(new Student(apiData.getName(), apiData.getAge()));
}
}
return new Observable<List<Student>>() {
@Override
protected void subscribeActual(Observer<? super List<Student>> observer) {
Log.d(TAG, "中游 2: 开始发射...");
observer.onNext(studentList);
observer.onComplete();
Log.d(TAG, "中游 2: 发射完成");
}
};
}
})
.subscribe(
// 下游 Observer 观察者
new Observer<List<Student>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(List<Student> list) {
Log.d(TAG, "下游接收 onNext: " + list);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 中游 2: 开始发射...
2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: [com.yoshin.tspsdk.rx.bean.Student@e1a0184, com.yoshin.tspsdk.rx.bean.Student@da10d6d]
2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 中游 2: 发射完成
2022-08-22 16:44:40.170 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
2022-08-22 16:44:40.170 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.flatMap(new Function<ApiData, ObservableSource<Student>>() {
@Override
public ObservableSource<Student> apply(ApiData apiData) throws Exception {
Log.d(TAG, "中游");
int delay = 0;
if (apiData.getName().equals("小黑")) {
delay = 500;//延迟500ms发射
}
return Observable.just(new Student(apiData.getName(), apiData.getAge())).delay(delay, TimeUnit.MILLISECONDS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
// 下游 Observer 观察者
new Observer<Student>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(Student s) {
Log.d(TAG, "下游接收 onNext: " + new Gson().toJson(s));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-23 09:55:23.423 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-23 09:55:23.423 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 09:55:23.423 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 09:55:23.424 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 09:55:23.424 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 09:55:23.437 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小明"}
2022-08-23 09:55:23.438 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小绿"}
2022-08-23 09:55:23.438 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小紫"}
2022-08-23 09:55:23.928 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小黑"}
2022-08-23 09:55:23.928 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.concatMap(new Function<ApiData, ObservableSource<Student>>() {
@Override
public ObservableSource<Student> apply(ApiData apiData) throws Exception {
Log.d(TAG, "中游");
int delay = 0;
if (apiData.getName().equals("小黑")) {
delay = 500;//延迟500ms发射
}
return Observable.just(new Student(apiData.getName(), apiData.getAge())).delay(delay, TimeUnit.MILLISECONDS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
// 下游 Observer 观察者
new Observer<Student>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
}
@Override
public void onNext(Student s) {
Log.d(TAG, "下游接收 onNext: " + new Gson().toJson(s));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// 只有接收完成之后,上游的最后log才会打印
Log.d(TAG, "下游接收完成 onComplete");
}
});
2022-08-23 10:01:09.137 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
2022-08-23 10:01:09.138 22528-22722/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 10:01:09.138 22528-22722/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 10:01:09.139 22528-22722/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 10:01:09.139 22528-22757/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 10:01:09.160 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小明"}
2022-08-23 10:01:09.639 22528-22758/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 10:01:09.641 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小黑"}
2022-08-23 10:01:09.641 22528-22768/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 10:01:09.641 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小绿"}
2022-08-23 10:01:09.643 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小紫"}
2022-08-23 10:01:09.643 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
RxJava中的groupBy,是将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个 子序列,哪个数据项由哪一个Observable发射是由一个函数判定 的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.groupBy(new Function<ApiData, String>() {
@Override
public String apply(ApiData apiData) throws Exception {
Log.d(TAG, "中游");
if (apiData.getName().equals("小黑")) {
return "1组";
}
return "2组";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<GroupedObservable<String, ApiData>>() {
@Override
public void accept(GroupedObservable<String, ApiData> stringApiDataGroupedObservable) throws Exception {
Log.d(TAG, "下游");
//获取key
final String key = stringApiDataGroupedObservable.getKey();
//获取value
Disposable disposable = stringApiDataGroupedObservable
.subscribe(new Consumer<ApiData>() {
@Override
public void accept(ApiData apiData) throws Exception {
Log.i(TAG, "accept: key =="+key + " ,apiData=="+apiData );
}
});
}
});
2022-08-23 13:18:16.586 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 13:18:16.586 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 13:18:16.587 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 13:18:16.587 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==2组 ,apiData==ApiData(name=小明, age=8, className=4)
2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==2组 ,apiData==ApiData(name=小绿, age=8, className=4)
2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==2组 ,apiData==ApiData(name=小紫, age=8, className=4)
2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==1组 ,apiData==ApiData(name=小黑, age=8, className=4)
先截断数据(缓存成集合),缓存到指定条件(数量、时间)后再下发

Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.buffer(2 )
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ApiData>>() {
@Override
public void accept(List<ApiData> dataList) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+ new Gson().toJson(dataList));
}
});
2022-08-23 13:29:52.382 13320-13851/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 13:29:52.382 13320-13851/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 13:29:52.397 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:29:52.413 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: accept: [{"age":8,"className":4,"name":"小明"},{"age":8,"className":4,"name":"小黑"}]
2022-08-23 13:29:52.413 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:29:52.414 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: accept: [{"age":8,"className":4,"name":"小绿"},{"age":8,"className":4,"name":"小紫"}]
可以指缓存数量或者时间

在时间间隔内缓存结果,类似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext(3);
emitter.onNext(33);
emitter.onNext(4);
emitter.onNext(6);
emitter.onNext(6);
emitter.onNext(6);
}
})
.window(2)
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
Log.d(TAG, "accept: integerObservable");
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
});
2022-08-25 18:27:27.189 23100-23100/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: integerObservable
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 3
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 33
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: integerObservable
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 4
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: integerObservable
2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
2022-08-25 18:27:27.191 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
在发射之前强制将Observable发射的所有数据转换为指定类型。
需强调的一点是只能由子类对象转换为父类对象,否则会报错。
class Student extends Person{}
Observable.just(new Student("北京"),new Student("北京"))
.cast(Person.class)
.subscribe(new Consumer<Person>() {
@Override
public void accept(Person person) throws Exception {
Log.d(TAG, "accept: "+ person);
}
});
对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。
它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext(3);
emitter.onNext(33);
emitter.onNext(4);
emitter.onNext(6);
}
}).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "apply integer: " + integer);
Log.d(TAG, "apply integer2: " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 3
2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer: 3
2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer2: 33
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 36
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer: 36
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer2: 4
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 40
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer: 40
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer2: 6
2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 46
scan会将每次计算的结构都发送到下游;
而reduce只会将最终结果发送到下游,complete为止不会收到第二次数据
Observable.just(1, 2, 3, 4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
2022-08-26 14:44:26.703 28320-28320/com.yoshin.tspsdk D/RxJavaActivity: accept: 10
true 继续下发,否则不下发
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.filter(new Predicate<ApiData>() {
@Override
public boolean test(ApiData apiData) throws Exception {
Log.d(TAG, "中游");
if (apiData.getName().equals("小黑")) return true;
else return false;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ApiData>() {
@Override
public void accept(ApiData apiData) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+apiData);
}
});
2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 中游
2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 13:34:57.792 14674-14674/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:34:57.792 14674-14674/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
留下前多少个

Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.take(2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ApiData>() {
@Override
public void accept(ApiData apiData) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+apiData);
}
});
2022-08-23 13:40:07.981 15687-15899/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 13:40:07.982 15687-15899/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小明, age=8, className=4)
2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
从后面数,留下多少个

Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(new ApiData("小紫", 8, 4));
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.takeLast(3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ApiData>() {
@Override
public void accept(ApiData apiData) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+apiData);
}
});
2022-08-23 13:41:16.199 16186-16309/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 13:41:16.200 16186-16309/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小绿, age=8, className=4)
2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小紫, age=8, className=4)
一直take ,直到某个条件进行skip
一直take ,直到第二个observable发射才开始skip

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.i(TAG, "subscribe: 上游发射");
emitter.onNext(3);
emitter.onNext(33);
emitter.onNext(4);
emitter.onNext(6);
emitter.onNext(6);
emitter.onNext(16);
}
})
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk I/RxJavaActivity: subscribe: 上游发射
2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 4
2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 16
一直skip ,直到某个条件进行take
一直skip ,直到第二个observable发射才开始take
distinct : 过滤掉重复的元素
distinctUntilChanged: 过滤掉连续重复的元素,不连续重复的是不过滤

Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
ApiData apiData = new ApiData("小紫", 8, 4);
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.distinct()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ApiData>() {
@Override
public void accept(ApiData apiData) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+apiData);
}
});
2022-08-23 13:51:00.570 18495-18807/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 13:51:00.570 18495-18807/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小明, age=8, className=4)
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小绿, age=8, className=4)
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小紫, age=8, className=4)
ElementAt操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。给它传递一个基于0的索引值,它会发射原始Observable数据序列对应索引位置的值,如果你传递给elementAt的值为4,那么它会发射第5项的数据。

Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
ApiData apiData = new ApiData("小紫", 8, 4);
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.elementAt(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ApiData>() {
@Override
public void accept(ApiData apiData) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+apiData);
}
});
2022-08-23 14:22:13.564 22396-22734/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 14:22:13.564 22396-22734/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 14:22:13.582 22396-22396/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 14:22:13.582 22396-22396/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
忽略源Publisher发出的所有项目,只调用onComplete或onError。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.i(TAG, "subscribe: 上游发射");
emitter.onNext(3);
...
}
})
.ignoreElements()
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
指定时间内没有新事件产生才下发。
在输出了一个数据后的一段时间内,没有再次输出新的数据,则把这个数据真正的发送出去;假如在这段时间内有新的数据输出,则以这个数据作为将要发送的数据项,并且重置这个时间段,重新计时。
RxJava 学习进行中-Scan&Debounce&ThrottleWithTimeout
:https://www.jianshu.com/p/8426d4778546

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.i(TAG, "subscribe: 上游发射");
emitter.onNext(3);
emitter.onNext(33);
emitter.onNext(4);
emitter.onNext(6);
emitter.onNext(6);
emitter.onNext(16);
Log.i(TAG, "subscribe: 上游发射完成");
}
})
.debounce(1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+ integer);
}
});
2022-08-26 13:48:46.425 24094-24094/com.yoshin.tspsdk I/RxJavaActivity: subscribe: 上游发射
2022-08-26 13:48:46.427 24094-24094/com.yoshin.tspsdk I/RxJavaActivity: subscribe: 上游发射完成
2022-08-26 13:48:47.429 24094-24172/com.yoshin.tspsdk D/RxJavaActivity: accept: 16
.ofType(class) 指定某个类型的class,过滤属于这个类型的的结果,其它抛弃
Observable.just("小黑", 12, "小绿", 23).ofType(Integer.class).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
2022-08-26 13:52:38.477 24674-24674/com.yoshin.tspsdk D/RxJavaActivity: accept: 12
2022-08-26 13:52:38.477 24674-24674/com.yoshin.tspsdk D/RxJavaActivity: accept: 23

允许设置一个时间间隔,在这个时间间隔内发送第一个事件,而屏蔽其他事件。
一般用于点击事件,防抖动。
Observable.just(1, 2, 3, 4)
.throttleFirst(1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
2022-08-26 15:20:22.960 30337-30337/com.yoshin.tspsdk D/RxJavaActivity: accept: 1
throttleFirst :在某段时间内,只发送该段时间内第1次事件(假如一个按钮1秒内点了3次 ,第一次显示,后2次不显示)
throttleLast: 在某段时间内,只发送该段时间内最后1次事件(假如一个按钮1秒内点了3次 ,最后第一次显示,前两次不显示)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws InterruptedException {
emitter.onNext(1);//第一次不发送
Thread.sleep(400);
emitter.onNext(2);
Thread.sleep(400);
emitter.onNext(3);
Thread.sleep(900); // 1000 之内是 3
emitter.onNext(4);
Thread.sleep(400); // 1000 之内是 4
emitter.onNext(5); // 2100 时发射的5
Thread.sleep(700);
emitter.onNext(6); // 2800 时发射的6
Thread.sleep(900);//3000之内 ,所以没有5只有6
emitter.onNext(7); // 3700时出现 7
}
}).throttleLast(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.d(TAG, "accept: " + integer);
}
});
throttleWithTimeout 是 debounce 实现
全部数据满足条件,则为true
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
ApiData apiData = new ApiData("小紫", 8, 4);
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.all(new Predicate<ApiData>() {
@Override
public boolean test(ApiData apiData) throws Exception {
if (apiData.getName() .equals("小紫"))return true;
return false;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+aBoolean);
}
});
2022-08-23 14:26:09.387 23450-23561/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 14:26:09.387 23450-23561/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 14:26:09.403 23450-23450/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 14:26:09.403 23450-23450/com.yoshin.tspsdk D/RxJavaActivity: accept: false
如果发射的数据包含指定值,那就返回true
contains 使用 any 操作符实现
ApiData apiData = new ApiData("小紫", 8, 4);
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.contains(apiData)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+aBoolean);
}
});
2022-08-23 14:32:17.451 24756-24890/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 14:32:17.452 24756-24890/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 14:32:17.488 24756-24756/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 14:32:17.488 24756-24756/com.yoshin.tspsdk D/RxJavaActivity: accept: true
任意一个满足条件,为true
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.any(new Predicate<ApiData>() {
@Override
public boolean test(ApiData data) throws Exception {
if (data.equals(apiData))return true;
return false;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "下游");
Log.d(TAG, "accept: "+aBoolean);
}
});
2022-08-23 14:35:48.733 25286-25437/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 14:35:48.733 25286-25437/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 14:35:48.769 25286-25286/com.yoshin.tspsdk D/RxJavaActivity: 下游
2022-08-23 14:35:48.769 25286-25286/com.yoshin.tspsdk D/RxJavaActivity: accept: true
isEmpty:判断是否发射的数据为空,如果为空,返回true;如果不为空,返回false
Observable.just("")
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "accept: "+aBoolean);
}
});
Observable.empty()
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "accept: "+aBoolean);
}
});
2022-08-26 14:00:54.062 26190-26190/com.yoshin.tspsdk D/RxJavaActivity: accept: false
2022-08-26 14:00:54.062 26190-26190/com.yoshin.tspsdk D/RxJavaActivity: accept: true
如果被观察者发射的数据为空,那么就发射defaultIfEmpty中给的那个值
Observable.just("")
.defaultIfEmpty("我是后补数据")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: "+s);
}
});
Observable.empty()
.defaultIfEmpty("我是后补数据")
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "accept: "+o);
}
});
2022-08-26 14:04:36.964 26946-26946/com.yoshin.tspsdk D/RxJavaActivity: accept:
2022-08-26 14:04:36.965 26946-26946/com.yoshin.tspsdk D/RxJavaActivity: accept: 我是后补数据
等同toSortedList()
Observable.fromArray(mTopFuncList.toArray())
.sorted((o1, o2) -> ((ArgParamBean) o1).getArgI1() - ((ArgParamBean) o2).getArgI1())
.subscribeOn(Schedulers.io())
.cast(ArgParamBean.class)
.toList()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(objects -> {
})
从小往大排序:
.sorted((o1, o2) -> ((ArgParamBean) o1).getArgI1() - ((ArgParamBean) o2).getArgI1())
在发射的数据头部添加数据

Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
ApiData apiData = new ApiData("小紫", 8, 4);
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.startWith(new Observable<ApiData>() {
@Override
protected void subscribeActual(Observer<? super ApiData> observer) {
Log.d(TAG, "中游 : 开始发射...");
observer.onNext(new ApiData("小白", 8, 4));
observer.onComplete();
Log.d(TAG, "中游 : 发射完成");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ApiData>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ApiData apiData) {
Log.d(TAG, "下游 onNext ="+apiData);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "下游 onComplete");
}
});
2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 开始发射...
2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 发射完成
2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小白, age=8, className=4)
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小明, age=8, className=4)
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小黑, age=8, className=4)
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小绿, age=8, className=4)
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onComplete
在上面数据的尾部添加数据
Observable
.create(new ObservableOnSubscribe<ApiData>() {
@Override
public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
Log.d(TAG, "上游 subscribe: 开始发射...");
emitter.onNext(new ApiData("小明", 8, 4));
emitter.onNext(new ApiData("小黑", 8, 4));
emitter.onNext(new ApiData("小绿", 8, 4));
ApiData apiData = new ApiData("小紫", 8, 4);
emitter.onNext(apiData);
emitter.onNext(apiData);
emitter.onComplete(); // 发射完成
// 上游的最后log才会打印
Log.d(TAG, "上游 subscribe: 发射完成");
}
})
.concatWith(new Observable<ApiData>() {
@Override
protected void subscribeActual(Observer<? super ApiData> observer) {
Log.d(TAG, "中游 : 开始发射...");
observer.onNext(new ApiData("小白", 8, 4));
observer.onComplete();
Log.d(TAG, "中游 : 发射完成");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ApiData>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ApiData apiData) {
Log.d(TAG, "下游 onNext ="+apiData);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "下游 onComplete");
}
});
2022-08-23 14:48:10.897 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
2022-08-23 14:48:10.898 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
2022-08-23 14:48:10.898 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 开始发射...
2022-08-23 14:48:10.898 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 发射完成
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小明, age=8, className=4)
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小黑, age=8, className=4)
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小绿, age=8, className=4)
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小白, age=8, className=4)
2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onComplete

concat(串行连接数据)
concatDelayError
concatEager(并行连接数据)
作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行(后一个必须等前一个执行完之后)执行
二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个。
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concat(list)
.subscribeOn(Schedulers.io())
.subscribe(i -> {
Log.d(TAG, "下游 i==" + new Gson().toJson(i));
});
2022-08-23 16:26:51.384 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==1
2022-08-23 16:26:51.385 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==2
2022-08-23 16:26:51.385 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==3
2022-08-23 16:26:51.385 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==4
当数据源发生错误时,concatDelayError允许继续连接其他数据源,并且在最后,进入onError
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: NullPointerException");
throw new NullPointerException();
}
}));
list.add(Observable.just(2).delay(1000, TimeUnit.MILLISECONDS));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concatDelayError(list).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-23 16:34:24.012 12137-12137/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-23 16:34:24.012 12137-12137/com.yoshin.tspsdk E/RxJavaActivity: accept: NullPointerException
2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onError: throw with null exception
等待延迟结束在调用
Log.i(TAG, " =============== concatEager ============= ");
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concatEager(list)
.subscribeOn(Schedulers.io())
.subscribe(i -> {
Log.d(TAG, "下游 i==" + new Gson().toJson(i));
});
2022-08-23 16:42:09.052 13375-13375/com.yoshin.tspsdk I/RxJavaActivity: =============== concatEager =============
2022-08-23 16:42:14.100 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==1
2022-08-23 16:42:14.101 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==2
2022-08-23 16:42:14.101 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==3
2022-08-23 16:42:14.101 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==4
Observable.concatArrayEagerDelayError(
Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: NullPointerException");
throw new NullPointerException();
}
}),
Observable.just(2),
Observable.just(3),
Observable.just(4))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-23 16:46:48.429 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk E/RxJavaActivity: accept: NullPointerException
2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
2022-08-23 16:46:48.431 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onError: throw with null exception

作用:组合多个被观察者一起发送数据,合并后 按时间线并行执行
二者区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个。
concatEager 和 merge 的区别是,concatEager 会等待Observable结果,结果同时发射;merge 是Observable同时调用
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concatEager(list)
.subscribeOn(Schedulers.io())
.subscribe(i -> {
Log.d(TAG, "下游 i==" + new Gson().toJson(i));
});
2022-08-23 17:06:32.161 15467-17514/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==2
2022-08-23 17:06:32.162 15467-17514/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==3
2022-08-23 17:06:32.162 15467-17514/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==4
2022-08-23 17:06:37.140 15467-17515/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==1
Observable.mergeArrayDelayError(
Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: NullPointerException");
throw new NullPointerException();
}
}).delay(1000, TimeUnit.MILLISECONDS),
Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
Observable.just(3),
Observable.just(4))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-23 17:39:13.906 22123-22123/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-23 17:39:13.907 22123-22123/com.yoshin.tspsdk E/RxJavaActivity: accept: NullPointerException
2022-08-23 17:39:13.908 22123-22123/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-23 17:39:13.908 22123-22123/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
2022-08-23 17:39:14.909 22123-22306/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
2022-08-23 17:39:14.909 22123-22306/com.yoshin.tspsdk D/RxJavaActivity: onError: throw with null exception

zip是在其中一个Observable发射数据项后,组合所有Observable最早一个未被组合的数据项,也就是说,组合后的Observable发射的第n个数据项,必然是每个源由Observable各自发射的第n个数据项构成的
Observable.zip(
Observable.just("小黑", "小红", "小x").delay(1000, TimeUnit.MILLISECONDS),
Observable.just(2,12).delay(1000, TimeUnit.MILLISECONDS),
new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
Log.d(TAG, "apply: ");
return s + integer;
}
}
)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-23 18:12:24.014 28797-28797/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: apply:
2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小黑2
2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: apply:
2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小红12
2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: onComplete:

combineLatest则是在其中一个Observable发射数据项后,组合所有Observable所发射的最后一个数据项(前提是所有的Observable都至少发射过一个数据项)
合并处理数据
Observable.combineLatest(
Observable.just("小黑", "小红", "小x").delay(1000, TimeUnit.MILLISECONDS),
Observable.just(2,12),
new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
Log.d(TAG, "apply: ");
return s + integer;
}
}
)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-24 10:55:36.735 14510-14510/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 10:55:37.738 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: apply:
2022-08-24 10:55:37.739 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小黑12
2022-08-24 10:55:37.744 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: apply:
2022-08-24 10:55:37.745 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小红12
2022-08-24 10:55:37.749 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: apply:
2022-08-24 10:55:37.750 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小x12
2022-08-24 10:55:37.751 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onComplete:
合并下发数据并且 延迟处理错误
Observable<Integer> observable2;
Observable<String> observable3;
Observable.combineLatestDelayError(
new ObservableSource[]{
Observable.just(2, 3),
observable2= Observable.just(4, 3),
observable3 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext("特别黑");
emitter.onError(new Throwable("这是个Throwable"));
}
})
},
new Function<Object[], String>() {
@Override
public String apply(Object[] objects) throws Exception {
Log.d(TAG, "apply: " + new Gson().toJson(objects));
return String.valueOf(objects[0]) +objects[1] + objects[2];
}
}
)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-24 15:14:04.732 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 15:14:04.733 23925-23925/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 15:14:04.745 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,3,"特别黑"]
2022-08-24 15:14:04.745 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: onNext: 33特别黑
2022-08-24 15:14:04.745 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是个Throwable
步骤二:如果 observable2 与 observable3 互换位置:
2022-08-24 16:20:16.394 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 16:20:16.394 29025-29025/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 16:20:16.396 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",4]
2022-08-24 16:20:16.396 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑4
2022-08-24 16:20:16.397 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",3]
2022-08-24 16:20:16.397 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑3
2022-08-24 16:20:16.397 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是个Throwable
步骤二:修改 observable3
observable3 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext("特别黑");
emitter.onError(new Throwable("这是个Throwable"));
emitter.onNext("特别白");
}
})
2022-08-24 16:23:29.470 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 16:23:29.471 29787-29787/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 16:23:29.482 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",4]
2022-08-24 16:23:29.483 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑4
2022-08-24 16:23:29.484 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",3]
2022-08-24 16:23:29.484 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑3
2022-08-24 16:23:29.484 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是个Throwable
给定多个Observable,只让第一个发射数据的Observable发射全部数据
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}));
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(10);
emitter.onNext(20);
emitter.onNext(30);
}
}).delay(300, TimeUnit.MILLISECONDS));
Observable.amb(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-25 14:40:28.394 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-25 14:40:28.395 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onNext: 1
2022-08-25 14:40:28.395 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
2022-08-25 14:40:28.395 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
.concatMap(aLong -> Observable.just(1, 2, 3, 4, 5));
Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
.flatMap(aLong -> Observable.just(6, 7, 8, 9, 10));
Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
.flatMap(aLong -> Observable.just(11, 12, 13, 14, 15));
Observable.ambArray(observable1, observable2, observable3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 11
2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 12
2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 13
2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 14
2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 15
2022-08-25 14:46:13.750 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onComplete:
收集发射的数据到一个数据结构里,然后将这个结构作为一个整体发射出去。
Observable.just(1, 2, 3, 4)
.collect(
new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
return new ArrayList<>(); // 这个地方的数据结构可变,跟业务有关
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> integerList, Integer integer2) throws Exception {
integerList.add(integer2);
}
})
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integerList) throws Exception {
Log.d(TAG, "accept: " + new Gson().toJson(integerList));
}
});
2022-08-26 14:52:12.435 28902-28902/com.yoshin.tspsdk D/RxJavaActivity: accept: [1,2,3,4]
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "accept: "+aLong);
}
});
2022-08-26 15:05:18.635 29618-29618/com.yoshin.tspsdk D/RxJavaActivity: accept: 4
让Observable遇到错误(Throwable、Error、Exception)时发射一个特殊的项并且正常终止。
Observable.merge(
Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext(3);
// throw new NullPointerException("异常");//拦截
emitter.onError(new Throwable("这是Throwable")); // 拦截
// emitter.onError(new Exception("测试错误")); //拦截
// emitter.onError(new Error("测试错误")); // 拦截
}
}),
Observable.just(4))
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-24 10:35:27.221 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 10:35:27.222 12231-12231/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 10:35:27.222 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 10:35:27.223 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onNext: -1
2022-08-24 10:35:27.223 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onComplete:
emitter.onError(new Throwable()); 或者 emitter.onError(new Error());
throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception());
所有异常都拦截
让Observable在遇到错误(Throwable、Error、Exception)时开始发射第二个Observable的数据序列。
Observable.merge(
Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext(3);
// throw new NullPointerException("异常");//拦截
emitter.onError(new Throwable("这是Throwable")); // 拦截
// emitter.onError(new Exception("测试错误")); //拦截
// emitter.onError(new Error("测试错误")); // 拦截
}
}),
Observable.just(4))
.onErrorResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
Log.e(TAG, "subscribeActual: ");
observer.onNext(110);
observer.onNext(120);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-24 10:31:15.082 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk E/RxJavaActivity: subscribeActual:
2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onNext: 110
2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onNext: 120
2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onComplete:
emitter.onError(new Throwable()); 或者 emitter.onError(new Error());
throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception());
所有异常都拦截
和onErrorResumeNext类似,
onExceptionResumeNext方法返回一个镜像原有Observable行为的新Observable,
也使用一个备用的Observable,
不同的是,
如果onError收到的Throwable不是一个Exception,
它会将错误传递给观察者的onError方法,不会使用备用的Observable。
Observable.merge(
Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext(3);
// throw new NullPointerException("异常");//拦截
// emitter.onError(new Throwable()); // 不拦截
// emitter.onError(new Exception("测试错误")); //拦截
emitter.onError(new Error("测试错误")); // 不拦截
}
}),
Observable.just(4))
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
Log.e(TAG, "subscribeActual: ");
observer.onNext(110);
observer.onNext(120);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
当 emitter.onError(new Throwable()); 或者 emitter.onError(new Error()); ,不拦截
2022-08-24 09:52:24.703 8455-8455/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 09:52:24.704 8455-8455/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 09:52:24.705 8455-8455/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 09:52:24.705 8455-8455/com.yoshin.tspsdk D/RxJavaActivity: onError: 测试错误
当 throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception()); ,拦截
2022-08-24 09:54:23.602 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 09:54:23.604 8902-8902/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 09:54:23.604 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk E/RxJavaActivity: subscribeActual:
2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onNext: 110
2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onNext: 120
2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onComplete:
Observable.merge(
Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe: ");
emitter.onNext(3);
// throw new NullPointerException("异常");//拦截
// emitter.onError(new Throwable("这是Throwable")); // 拦截
// emitter.onError(new Exception("这是Exception")); //拦截
emitter.onError(new Error("这是Error")); // 拦截
}
}),
Observable.just(4))
.retry(2, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
Log.e(TAG, "test: " + throwable.getMessage());
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-24 10:43:58.735 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-24 10:43:58.737 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 10:43:58.737 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 10:43:58.737 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: test: 这是Error
2022-08-24 10:43:58.738 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 10:43:58.738 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 10:43:58.738 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: test: 这是Error
2022-08-24 10:43:58.739 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: subscribe:
2022-08-24 10:43:58.739 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-24 10:43:58.739 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是Error
emitter.onError(new Throwable()); 或者 emitter.onError(new Error());
throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception());
所有异常都拦截

如果repeat不带参数默认无限循环
Observable.just("小黑", "真帅")
.repeat(2)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
设置下次执行的时间,这种方式只能执行两次
Observable.just("小黑", "真帅")
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
2022-08-25 15:16:34.581 12559-12559/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
2022-08-25 15:16:34.581 12559-12559/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
2022-08-25 15:16:35.583 12559-12721/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
2022-08-25 15:16:35.584 12559-12721/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
Observable.just("小黑", "真帅")
.repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
// true时则立即中断执行
return false;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});

Observable.just(4, 3).delay(1000, TimeUnit.MILLISECONDS)
do系列操作符相当于给Observable执行周期的关键节点添加回调。
当Observable执行到这个阶段的时候,这些回调就会【先被触发】。
当Observable每发送一个数据时,doOnNext会被首先调用,然后再onNext。
若发射中途出现异常doOnError会被调用,然后onError。
若数据正常发送完毕doOnCompleted会被触发,然后执行onCompleted。
当订阅或者解除订阅doOnSubscribe,doOnUnsubscribe会被执行。
(1)doOnSubscribe:在被观察者和观察者产生关联的时候被调用,disposable可以立即取消订阅;
(2)doOnLifecycle:可以在订阅之后设置是否取消订阅;
(3)doNext和doAfterNext:可以接收到被观察者发射过来的数据;
(4)doOnEach:当onNext、onError、onComplete被触发是被调用;
(5)doOnComplete:当触发onComplete时被调用;
(6)doOnError:当触发onError时被调用;
(7)doFinally:当触发onError或onComplete时被调用;
(8)doOnDispose:当取消订阅时被调用;
(9)doAfterTerminate:订阅终止时被调用;
Observable.just(1, 2, 3)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "accept: doOnSubscribe");
}
})
.doOnDispose(new Action() {//当取消订阅时被调用;
@Override
public void run() throws Exception {
Log.d(TAG, "run: doOnDispose");
}
})
.doOnTerminate(new Action() {//订阅终止时被调用;
@Override
public void run() throws Exception {
Log.d(TAG, "run: doOnTerminate");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: doAfterTerminate");
}
})
.doOnLifecycle(
new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "accept: doOnLifecycle");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: doOnLifecycle");
}
})
.doOnNext(new Consumer<Integer>() {//可以接收到被观察者发射过来的数据之前;
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: doOnNext ==" + integer);
}
})
.doAfterNext(new Consumer<Integer>() {//可以接收到被观察者发射过来的数据之后;
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: doAfterNext ==" + integer);
}
})
.doOnComplete(new Action() {//当触发onComplete时被调用;
@Override
public void run() throws Exception {
Log.d(TAG, "run: doOnComplete");
}
})
.doOnError(new Consumer<Throwable>() {
//当触发onError时被调用;
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "run: doOnError");
}
})
.doFinally(new Action() {
//当触发onError或onComplete时被调用;
@Override
public void run() throws Exception {
Log.d(TAG, "run: doFinally");
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
//当onNext、onError、onComplete被触发时被调用;
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "run: doOnEach ==" + integerNotification.getValue());
}
})
//**************************************************************
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnSubscribe
2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnLifecycle
2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnNext ==1
2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==1
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onNext: 1
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doAfterNext ==1
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnNext ==2
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==2
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doAfterNext ==2
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnNext ==3
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==3
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doAfterNext ==3
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnTerminate
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnComplete
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==null
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onComplete:
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doFinally
2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doAfterTerminate
转:RxJava : do操作符
https://blog.csdn.net/sinat_31057219/article/details/101374255
如果从其前任开始的指定超时时间内未发出下一项,则生成的 Observable 将终止并通知观察者 TimeoutException。

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws InterruptedException {
emitter.onNext(1);
Thread.sleep(400);
emitter.onNext(2);
Thread.sleep(400);
emitter.onNext(3);
Thread.sleep(900);
emitter.onNext(4);
Thread.sleep(1100);
emitter.onNext(66);
}
})
.observeOn(Schedulers.io())
.timeout(1000, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
2022-08-26 16:07:42.458 3341-3341/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe:
2022-08-26 16:07:42.459 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 1
2022-08-26 16:07:42.860 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
2022-08-26 16:07:43.261 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
2022-08-26 16:07:44.161 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
2022-08-26 16:07:45.166 3341-3563/com.yoshin.tspsdk D/RxJavaActivity: onError: The source did not signal an event for 1000 milliseconds and has been terminated.
中文文档 :https://www.kancloud.cn/luponu/rxjava_zh/974452
http://reactivex.io/RxJava/2.x/javadoc/
rxjava:错误处理操作符(2): onErrorReturn 、 onErrorResumeNext 、onExceptionResumeNext https://blog.csdn.net/sinat_31057219/article/details/101304867
RxJava2操作符总结 – 你想要的都在这里了:https://www.jianshu.com/p/02c83de487e3
RxJava 过滤操作符 throttleFirst 与 throttleLast 以及 sample:https://blog.csdn.net/qq_33210042/article/details/103352275