• 每天记录学习的新知识:RxJava2 操作符


    RxJava2 操作符

    中文文档 :https://www.kancloud.cn/luponu/rxjava_zh/974452

    零、基础类

    1、基础类

    • Flowable: 多个流,响应式流和背压
    • Observable: 多个流,无背压 (被观察者)
    • Single: 只有一个元素或者错误的流
    • Completable: 没有任何元素,只有一个完成和错误信号的流
    • Maybe: 没有任何元素或者只有一个元素或者只有一个错误的流

    2、调度器种类

    • Schedulers.computation()
      用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量

    • Schedulers.from(executor) 使用指定的Executor作为调度器

    • Schedulers.single() 该调度器的线程池只能同时执行一个线程

    • Schedulers.io()
      用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;
      对于普通的计算任务,请使用Schedulers.computation();
      默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器

    • Schedulers.newThread() 为每个任务创建一个新线程

    • Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行。

    • AndroidSchedulers.mainThread() 主线程,UI线程,可以用于更新界面

    一、创建型操作符

    1、create

    static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
    
    • 1

    可以创建任意类型的参数

            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                            // 发射
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext("下发数据");
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .subscribe(
                            // 下游 Observer 观察者
                            new Observer<String>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                                }
    
                                @Override
                                public void onNext(String s) {
                                    Log.d(TAG, "下游接收 onNext: " + s);
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
                                    // 只有接收完成之后,上游的最后log才会打印
                                    Log.d(TAG, "下游接收完成 onComplete");
                                }
                            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 下发数据
    2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    2022-08-22 16:05:09.221 18440-18440/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、just

    可以发射单一数据或者数集

            Observable.just("one", "two", "three", "four", "five").subscribe(
                    // 下游 Observer 观察者
                    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.d(TAG, "下游接收 onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            // 只有接收完成之后,上游的最后log才会打印
                            Log.d(TAG, "下游接收完成 onComplete");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: one
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: two
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: three
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: four
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: five
    2022-08-22 16:13:30.201 19470-19470/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、fromArray

    在这里插入图片描述

    发射数集

    跟 just 一样,just 内部调用 fromArray

    4、empty

    4.1、empty

    直接发送 onComplete() 事件

            Observable.empty().subscribe(
                    // 下游 Observer 观察者
                    new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                        }
    
                        @Override
                        public void onNext(Object s) {
                            Log.d(TAG, "下游接收 onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "下游接收异常 onError:"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            // 只有接收完成之后,上游的最后log才会打印
                            Log.d(TAG, "下游接收完成 onComplete");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    2022-08-22 16:19:57.417 20127-20127/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-22 16:19:57.417 20127-20127/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    
    • 1
    • 2

    4.2、never

    不发送任何事件

    4.3、error

    发送 onError() 事件

    5、range

    发射指定范围的整数序列

            Observable.range(1, 7).subscribe(
                    // 下游 Observer 观察者
                    new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                        }
    
                        @Override
                        public void onNext(Integer s) {
                            Log.d(TAG, "下游接收 onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            // 只有接收完成之后,上游的最后log才会打印
                            Log.d(TAG, "下游接收完成 onComplete");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 1
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 2
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 3
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 4
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 5
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 6
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: 7
    2022-08-22 16:23:41.594 20571-20571/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    6、defer (订阅后执行)

    确保Observable代码在被订阅后才执行,而不是创建后立即执行

            Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    return new Observable<String>() {
                        @Override
                        protected void subscribeActual(Observer<? super String> observer) {
                            observer.onNext("发射上游事件");
                            observer.onNext("确保Observable代码在被订阅后才执行,而不是创建后立即执行");
                            observer.onComplete();
                        }
                    };
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext: " + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    7、timer (延时)

    多少秒后调用

            Observable.timer(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.i(TAG, "accept: " + aLong);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2022-08-25 15:35:02.126 13208-13208/com.yoshin.tspsdk I/RxJavaActivity: onCreate: ================
    2022-08-25 15:35:03.132 13208-13431/com.yoshin.tspsdk I/RxJavaActivity: accept: 0
    
    • 1
    • 2

    8、interval

    在这里插入图片描述

    8.1、interval(轮询)

    按照给定的时间间隔发射整数序列的Observable

            Observable.interval(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.i(TAG, "accept: " + aLong);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2022-08-25 15:40:30.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 0
    2022-08-25 15:40:31.020 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 1
    2022-08-25 15:40:32.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 2
    2022-08-25 15:40:33.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 3
    2022-08-25 15:40:34.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 4
    2022-08-25 15:40:35.019 13668-13858/com.yoshin.tspsdk I/RxJavaActivity: accept: 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    8.2、intervalRange(轮询次数)

            Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.i(TAG, "accept: " + aLong);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    延迟0毫秒,从1开始发射,发射5次,时间间隔 1000毫秒

    2022-08-25 15:45:09.665 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 1
    2022-08-25 15:45:10.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 2
    2022-08-25 15:45:11.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 3
    2022-08-25 15:45:12.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 4
    2022-08-25 15:45:13.666 14119-14274/com.yoshin.tspsdk I/RxJavaActivity: accept: 5
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、变换型操作符

    1、map

            Observable
                    .create(new ObservableOnSubscribe<List<ApiData>>() {
                        @Override
                        public void subscribe(ObservableEmitter<List<ApiData>> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            List<ApiData> apiDataList = new ArrayList<>();
                            apiDataList.add(new ApiData("小明", 8, 4));
                            apiDataList.add(new ApiData("小黑", 7, 3));
                            emitter.onNext(apiDataList);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .map(new Function<List<ApiData>, List<Student>>() {
                        @Override
                        public List<Student> apply(List<ApiData> dataList) throws Exception {
                            Log.d(TAG, "中游");
                            if (dataList != null && dataList.size() > 0) {
                                List<Student> studentList = new ArrayList<>();
                                for (ApiData apiData : dataList) {
                                    studentList.add(new Student(apiData.getName(), apiData.getAge()));
                                }
                                return studentList;
                            } else {
                                return null;
                            }
                        }
                    })
                    .subscribe(
                            // 下游 Observer 观察者
                            new Observer<List<Student>>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                                }
    
                                @Override
                                public void onNext(List<Student> list) {
                                    Log.d(TAG, "下游接收 onNext: " + list);
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
                                    // 只有接收完成之后,上游的最后log才会打印
                                    Log.d(TAG, "下游接收完成 onComplete");
                                }
                            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: [com.yoshin.tspsdk.rx.bean.Student@6ceb616, com.yoshin.tspsdk.rx.bean.Student@c826b97]
    2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    2022-08-22 16:39:13.178 22145-22145/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、flatMap (无序)

    在这里插入图片描述

    • 例 1
            Observable
                    .create(new ObservableOnSubscribe<List<ApiData>>() {
                        @Override
                        public void subscribe(ObservableEmitter<List<ApiData>> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            List<ApiData> apiDataList = new ArrayList<>();
                            apiDataList.add(new ApiData("小明", 8, 4));
                            apiDataList.add(new ApiData("小黑", 7, 3));
                            emitter.onNext(apiDataList);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .flatMap(new Function<List<ApiData>, ObservableSource<List<Student>>>() {
                        @Override
                        public ObservableSource<List<Student>> apply(List<ApiData> dataList) throws Exception {
                            Log.d(TAG, "中游");
                            List<Student> studentList = new ArrayList<>();
                            if (dataList != null && dataList.size() > 0) {
                                for (ApiData apiData : dataList) {
                                    studentList.add(new Student(apiData.getName(), apiData.getAge()));
                                }
                            }
                            return new Observable<List<Student>>() {
                                @Override
                                protected void subscribeActual(Observer<? super List<Student>> observer) {
                                    Log.d(TAG, "中游 2: 开始发射...");
                                    observer.onNext(studentList);
                                    observer.onComplete();
                                    Log.d(TAG, "中游 2: 发射完成");
                                }
                            };
                        }
                    })
                    .subscribe(
                            // 下游 Observer 观察者
                            new Observer<List<Student>>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                                }
    
                                @Override
                                public void onNext(List<Student> list) {
                                    Log.d(TAG, "下游接收 onNext: " + list);
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
                                    // 只有接收完成之后,上游的最后log才会打印
                                    Log.d(TAG, "下游接收完成 onComplete");
                                }
                            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 中游 2: 开始发射...
    2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: [com.yoshin.tspsdk.rx.bean.Student@e1a0184, com.yoshin.tspsdk.rx.bean.Student@da10d6d]
    2022-08-22 16:44:40.169 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 中游 2: 发射完成
    2022-08-22 16:44:40.170 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    2022-08-22 16:44:40.170 22741-22741/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 例 2 - flatMap 是无序的
            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .flatMap(new Function<ApiData, ObservableSource<Student>>() {
                        @Override
                        public ObservableSource<Student> apply(ApiData apiData) throws Exception {
                            Log.d(TAG, "中游");
                            int delay = 0;
                            if (apiData.getName().equals("小黑")) {
                                delay = 500;//延迟500ms发射
                            }
                            return Observable.just(new Student(apiData.getName(), apiData.getAge())).delay(delay, TimeUnit.MILLISECONDS);
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            // 下游 Observer 观察者
                            new Observer<Student>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                                }
    
                                @Override
                                public void onNext(Student s) {
                                    Log.d(TAG, "下游接收 onNext: " + new Gson().toJson(s));
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
                                    // 只有接收完成之后,上游的最后log才会打印
                                    Log.d(TAG, "下游接收完成 onComplete");
                                }
                            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    2022-08-23 09:55:23.423 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-23 09:55:23.423 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 09:55:23.423 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 09:55:23.424 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 09:55:23.424 21847-22069/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 09:55:23.437 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小明"}
    2022-08-23 09:55:23.438 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小绿"}
    2022-08-23 09:55:23.438 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小紫"}
    2022-08-23 09:55:23.928 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小黑"}
    2022-08-23 09:55:23.928 21847-21847/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3、concatMap (有序)

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .concatMap(new Function<ApiData, ObservableSource<Student>>() {
                        @Override
                        public ObservableSource<Student> apply(ApiData apiData) throws Exception {
                            Log.d(TAG, "中游");
                            int delay = 0;
                            if (apiData.getName().equals("小黑")) {
                                delay = 500;//延迟500ms发射
                            }
                            return Observable.just(new Student(apiData.getName(), apiData.getAge())).delay(delay, TimeUnit.MILLISECONDS);
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            // 下游 Observer 观察者
                            new Observer<Student>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    Log.d(TAG, "上游和下游订阅成功 onSubscribe 1");
                                }
    
                                @Override
                                public void onNext(Student s) {
                                    Log.d(TAG, "下游接收 onNext: " + new Gson().toJson(s));
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
                                    // 只有接收完成之后,上游的最后log才会打印
                                    Log.d(TAG, "下游接收完成 onComplete");
                                }
                            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    2022-08-23 10:01:09.137 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 上游和下游订阅成功 onSubscribe 1
    2022-08-23 10:01:09.138 22528-22722/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 10:01:09.138 22528-22722/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 10:01:09.139 22528-22722/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 10:01:09.139 22528-22757/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 10:01:09.160 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小明"}
    2022-08-23 10:01:09.639 22528-22758/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 10:01:09.641 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小黑"}
    2022-08-23 10:01:09.641 22528-22768/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 10:01:09.641 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小绿"}
    2022-08-23 10:01:09.643 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收 onNext: {"age":8,"name":"小紫"}
    2022-08-23 10:01:09.643 22528-22528/com.yoshin.tspsdk D/RxJavaActivity: 下游接收完成 onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4、groupBy (拆分)

    RxJava中的groupBy,是将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个 子序列,哪个数据项由哪一个Observable发射是由一个函数判定 的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .groupBy(new Function<ApiData, String>() {
                        @Override
                        public String apply(ApiData apiData) throws Exception {
                            Log.d(TAG, "中游");
                            if (apiData.getName().equals("小黑")) {
                                return  "1组";
                            }
                            return "2组";
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<GroupedObservable<String, ApiData>>() {
                        @Override
                        public void accept(GroupedObservable<String, ApiData> stringApiDataGroupedObservable) throws Exception {
                            Log.d(TAG, "下游");
                            //获取key
                            final String key = stringApiDataGroupedObservable.getKey();
                            //获取value
                            Disposable disposable = stringApiDataGroupedObservable
                                    .subscribe(new Consumer<ApiData>() {
                                        @Override
                                        public void accept(ApiData apiData) throws Exception {
                                            Log.i(TAG, "accept: key =="+key + " ,apiData=="+apiData );
                                        }
                                    });
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    2022-08-23 13:18:16.586 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 13:18:16.586 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 13:18:16.587 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 13:18:16.587 24062-11143/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==2,apiData==ApiData(name=小明, age=8, className=4)
    2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==2,apiData==ApiData(name=小绿, age=8, className=4)
    2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==2,apiData==ApiData(name=小紫, age=8, className=4)
    2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:18:16.601 24062-24062/com.yoshin.tspsdk I/RxJavaActivity: accept: key ==1,apiData==ApiData(name=小黑, age=8, className=4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    5、buffer (缓存)

    先截断数据(缓存成集合),缓存到指定条件(数量、时间)后再下发
    在这里插入图片描述

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .buffer(2 )
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<List<ApiData>>() {
                        @Override
                        public void accept(List<ApiData> dataList) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+ new Gson().toJson(dataList));
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    2022-08-23 13:29:52.382 13320-13851/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 13:29:52.382 13320-13851/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 13:29:52.397 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:29:52.413 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: accept: [{"age":8,"className":4,"name":"小明"},{"age":8,"className":4,"name":"小黑"}]
    2022-08-23 13:29:52.413 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:29:52.414 13320-13320/com.yoshin.tspsdk D/RxJavaActivity: accept: [{"age":8,"className":4,"name":"小绿"},{"age":8,"className":4,"name":"小紫"}]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    6、window (缓存)

    可以指缓存数量或者时间
    在这里插入图片描述

    在时间间隔内缓存结果,类似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable

            Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            Log.e(TAG, "subscribe: ");
                            emitter.onNext(3);
                            emitter.onNext(33);
                            emitter.onNext(4);
                            emitter.onNext(6);
                            emitter.onNext(6);
                            emitter.onNext(6);
                        }
                    })
                    .window(2)
                    .subscribe(new Consumer<Observable<Integer>>() {
                        @Override
                        public void accept(Observable<Integer> integerObservable) throws Exception {
                            Log.d(TAG, "accept: integerObservable");
                            integerObservable.subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.d(TAG, "accept: " + integer);
                                }
                            });
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    2022-08-25 18:27:27.189 23100-23100/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: integerObservable
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 3
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 33
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: integerObservable
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 4
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: integerObservable
    2022-08-25 18:27:27.190 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
    2022-08-25 18:27:27.191 23100-23100/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    7、cast (强制类型转换)

    在发射之前强制将Observable发射的所有数据转换为指定类型。
    需强调的一点是只能由子类对象转换为父类对象,否则会报错。

    		class Student extends Person{}
    
            Observable.just(new Student("北京"),new Student("北京"))
                    .cast(Person.class)
                    .subscribe(new Consumer<Person>() {
                        @Override
                        public void accept(Person person) throws Exception {
                            Log.d(TAG, "accept: "+ person);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    8、scan (累*加减乘除)

    对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。
    它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。

    在这里插入图片描述

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.e(TAG, "subscribe: ");
                    emitter.onNext(3);
                    emitter.onNext(33);
                    emitter.onNext(4);
                    emitter.onNext(6);
                }
            }).scan(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.d(TAG, "apply integer: " + integer);
                    Log.d(TAG, "apply integer2: " + integer2);
                    return integer + integer2;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept: " + integer);
                }
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 3
    2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer: 3
    2022-08-25 17:24:25.738 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer2: 33
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 36
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer: 36
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer2: 4
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 40
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer: 40
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: apply integer2: 6
    2022-08-25 17:24:25.739 18743-18743/com.yoshin.tspsdk D/RxJavaActivity: accept: 46
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    9、reduce (累*加减乘除)

    scan会将每次计算的结构都发送到下游;
    而reduce只会将最终结果发送到下游,complete为止不会收到第二次数据

            Observable.just(1, 2, 3, 4)
                    .reduce(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer integer, Integer integer2) throws Exception {
                            return integer + integer2;
                        }
                    }).subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: " + integer);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    2022-08-26 14:44:26.703 28320-28320/com.yoshin.tspsdk D/RxJavaActivity: accept: 10
    
    • 1

    三、过滤型操作符

    1、filter (满足条件下发)

    true 继续下发,否则不下发

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .filter(new Predicate<ApiData>() {
                        @Override
                        public boolean test(ApiData apiData) throws Exception {
                            Log.d(TAG, "中游");
                            if (apiData.getName().equals("小黑")) return true;
                            else return false;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ApiData>() {
                        @Override
                        public void accept(ApiData apiData) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+apiData);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 中游
    2022-08-23 13:34:57.775 14674-14845/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 13:34:57.792 14674-14674/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:34:57.792 14674-14674/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、take (截留)

    2.1、take

    留下前多少个
    在这里插入图片描述

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .take(2)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ApiData>() {
                        @Override
                        public void accept(ApiData apiData) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+apiData);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    2022-08-23 13:40:07.981 15687-15899/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 13:40:07.982 15687-15899/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小明, age=8, className=4)
    2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:40:07.997 15687-15687/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2、takeLast (反向截留)

    从后面数,留下多少个

    在这里插入图片描述

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            emitter.onNext(new ApiData("小紫", 8, 4));
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .takeLast(3)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ApiData>() {
                        @Override
                        public void accept(ApiData apiData) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+apiData);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    2022-08-23 13:41:16.199 16186-16309/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 13:41:16.200 16186-16309/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
    2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小绿, age=8, className=4)
    2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:41:16.236 16186-16186/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小紫, age=8, className=4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.3、takeWhile

    一直take ,直到某个条件进行skip

    2.4、takeUtil

    一直take ,直到第二个observable发射才开始skip

    3、skip (跳过)

    在这里插入图片描述

    3.1、skip

            Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            Log.i(TAG, "subscribe: 上游发射");
                            emitter.onNext(3);
                            emitter.onNext(33);
                            emitter.onNext(4);
                            emitter.onNext(6);
                            emitter.onNext(6);
                            emitter.onNext(16);
                        }
                    })
                    .skip(2)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: "+integer);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk I/RxJavaActivity: subscribe: 上游发射
    2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 4
    2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
    2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 6
    2022-08-26 10:55:46.997 19746-19746/com.yoshin.tspsdk D/RxJavaActivity: accept: 16
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.2、skipLast

    3.3、skipWhile

    一直skip ,直到某个条件进行take

    2.4、skipUtil

    一直skip ,直到第二个observable发射才开始take

    4、distinct (重复元素)

    distinct : 过滤掉重复的元素

    distinctUntilChanged: 过滤掉连续重复的元素,不连续重复的是不过滤

    在这里插入图片描述

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            ApiData apiData = new ApiData("小紫", 8, 4);
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .distinct()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ApiData>() {
                        @Override
                        public void accept(ApiData apiData) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+apiData);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2022-08-23 13:51:00.570 18495-18807/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 13:51:00.570 18495-18807/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小明, age=8, className=4)
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小绿, age=8, className=4)
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 13:51:00.585 18495-18495/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小紫, age=8, className=4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    5、elementAt (指定下发数据位置)

    ElementAt操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。给它传递一个基于0的索引值,它会发射原始Observable数据序列对应索引位置的值,如果你传递给elementAt的值为4,那么它会发射第5项的数据。

    在这里插入图片描述

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            ApiData apiData = new ApiData("小紫", 8, 4);
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .elementAt(1)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ApiData>() {
                        @Override
                        public void accept(ApiData apiData) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+apiData);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2022-08-23 14:22:13.564 22396-22734/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 14:22:13.564 22396-22734/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 14:22:13.582 22396-22396/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 14:22:13.582 22396-22396/com.yoshin.tspsdk D/RxJavaActivity: accept: ApiData(name=小黑, age=8, className=4)
    
    • 1
    • 2
    • 3
    • 4

    6、ignoreElements (忽略所有事件)

    忽略源Publisher发出的所有项目,只调用onComplete或onError。

            Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            Log.i(TAG, "subscribe: 上游发射");
                            emitter.onNext(3);
    						...
                        }
                    })
                    .ignoreElements()
                    .subscribe(new CompletableObserver() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    6、debounce(防抖动)

    指定时间内没有新事件产生才下发。

    在输出了一个数据后的一段时间内,没有再次输出新的数据,则把这个数据真正的发送出去;假如在这段时间内有新的数据输出,则以这个数据作为将要发送的数据项,并且重置这个时间段,重新计时。

    RxJava 学习进行中-Scan&Debounce&ThrottleWithTimeout
    :https://www.jianshu.com/p/8426d4778546

    在这里插入图片描述

            Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            Log.i(TAG, "subscribe: 上游发射");
                            emitter.onNext(3);
                            emitter.onNext(33);
                            emitter.onNext(4);
                            emitter.onNext(6);
                            emitter.onNext(6);
                            emitter.onNext(16);
                            Log.i(TAG, "subscribe: 上游发射完成");
                        }
                    })
                    .debounce(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: "+ integer);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    2022-08-26 13:48:46.425 24094-24094/com.yoshin.tspsdk I/RxJavaActivity: subscribe: 上游发射
    2022-08-26 13:48:46.427 24094-24094/com.yoshin.tspsdk I/RxJavaActivity: subscribe: 上游发射完成
    2022-08-26 13:48:47.429 24094-24172/com.yoshin.tspsdk D/RxJavaActivity: accept: 16
    
    • 1
    • 2
    • 3

    7、ofType (过滤)

    .ofType(class) 指定某个类型的class,过滤属于这个类型的的结果,其它抛弃

            Observable.just("小黑", 12, "小绿", 23).ofType(Integer.class).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept: "+integer);
                }
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2022-08-26 13:52:38.477 24674-24674/com.yoshin.tspsdk D/RxJavaActivity: accept: 12
    2022-08-26 13:52:38.477 24674-24674/com.yoshin.tspsdk D/RxJavaActivity: accept: 23
    
    • 1
    • 2

    8、throttleFirst (过滤)

    在这里插入图片描述

    8.1、throttleFirst

    允许设置一个时间间隔,在这个时间间隔内发送第一个事件,而屏蔽其他事件。
    一般用于点击事件,防抖动。

            Observable.just(1, 2, 3, 4)
                    .throttleFirst(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: " + integer);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    2022-08-26 15:20:22.960 30337-30337/com.yoshin.tspsdk D/RxJavaActivity: accept: 1
    
    • 1

    throttleFirst :在某段时间内,只发送该段时间内第1次事件(假如一个按钮1秒内点了3次 ,第一次显示,后2次不显示)

    8.2、throttleLast

    throttleLast: 在某段时间内,只发送该段时间内最后1次事件(假如一个按钮1秒内点了3次 ,最后第一次显示,前两次不显示)

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws InterruptedException {
                    emitter.onNext(1);//第一次不发送
                    Thread.sleep(400);
                    emitter.onNext(2);
                    Thread.sleep(400);
                    emitter.onNext(3);
                    
                    Thread.sleep(900); // 1000 之内是 3
                    emitter.onNext(4);
    
                    Thread.sleep(400); // 1000 之内是 4
                    emitter.onNext(5); // 2100 时发射的5
                    Thread.sleep(700);
                    emitter.onNext(6); // 2800 时发射的6
                    
                    Thread.sleep(900);//3000之内 ,所以没有5只有6
                    emitter.onNext(7); // 3700时出现 7
                }
            }).throttleLast(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.d(TAG, "accept: " + integer);
                }
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    8.3、throttleWithTimeout

    throttleWithTimeout 是 debounce 实现

    四、条件型操作符

    1、all (全满足条件)

    全部数据满足条件,则为true

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            ApiData apiData = new ApiData("小紫", 8, 4);
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .all(new Predicate<ApiData>() {
                        @Override
                        public boolean test(ApiData apiData) throws Exception {
                            if (apiData.getName() .equals("小紫"))return true;
                            return false;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+aBoolean);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    2022-08-23 14:26:09.387 23450-23561/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 14:26:09.387 23450-23561/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 14:26:09.403 23450-23450/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 14:26:09.403 23450-23450/com.yoshin.tspsdk D/RxJavaActivity: accept: false
    
    • 1
    • 2
    • 3
    • 4

    2、contains (包含某条件)

    如果发射的数据包含指定值,那就返回true

    contains 使用 any 操作符实现

            ApiData apiData = new ApiData("小紫", 8, 4);
            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
    
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .contains(apiData)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+aBoolean);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2022-08-23 14:32:17.451 24756-24890/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 14:32:17.452 24756-24890/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 14:32:17.488 24756-24756/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 14:32:17.488 24756-24756/com.yoshin.tspsdk D/RxJavaActivity: accept: true
    
    • 1
    • 2
    • 3
    • 4

    3、any (定义条件)

    任意一个满足条件,为true

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
    
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .any(new Predicate<ApiData>() {
                        @Override
                        public boolean test(ApiData data) throws Exception {
                            if (data.equals(apiData))return true;
                            return false;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            Log.d(TAG, "下游");
                            Log.d(TAG, "accept: "+aBoolean);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    2022-08-23 14:35:48.733 25286-25437/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 14:35:48.733 25286-25437/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 14:35:48.769 25286-25286/com.yoshin.tspsdk D/RxJavaActivity: 下游
    2022-08-23 14:35:48.769 25286-25286/com.yoshin.tspsdk D/RxJavaActivity: accept: true
    
    • 1
    • 2
    • 3
    • 4

    4、isEmpty

    isEmpty:判断是否发射的数据为空,如果为空,返回true;如果不为空,返回false

            Observable.just("")
                    .isEmpty()
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            Log.d(TAG, "accept: "+aBoolean);
                        }
                    });
    
            Observable.empty()
                    .isEmpty()
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            Log.d(TAG, "accept: "+aBoolean);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    2022-08-26 14:00:54.062 26190-26190/com.yoshin.tspsdk D/RxJavaActivity: accept: false
    2022-08-26 14:00:54.062 26190-26190/com.yoshin.tspsdk D/RxJavaActivity: accept: true
    
    • 1
    • 2

    5、defaultIfEmpty

    如果被观察者发射的数据为空,那么就发射defaultIfEmpty中给的那个值

            Observable.just("")
                    .defaultIfEmpty("我是后补数据")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "accept: "+s);
                        }
                    });
    
            Observable.empty()
                    .defaultIfEmpty("我是后补数据")
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) throws Exception {
                            Log.d(TAG, "accept: "+o);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    2022-08-26 14:04:36.964 26946-26946/com.yoshin.tspsdk D/RxJavaActivity: accept: 
    2022-08-26 14:04:36.965 26946-26946/com.yoshin.tspsdk D/RxJavaActivity: accept: 我是后补数据
    
    • 1
    • 2

    6、sorted (排序)

    等同toSortedList()

    Observable.fromArray(mTopFuncList.toArray())
                        .sorted((o1, o2) -> ((ArgParamBean) o1).getArgI1() - ((ArgParamBean) o2).getArgI1())
                        .subscribeOn(Schedulers.io())
                        .cast(ArgParamBean.class)
                        .toList()
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(objects -> {
                          
                        })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    从小往大排序:

                        .sorted((o1, o2) -> ((ArgParamBean) o1).getArgI1() - ((ArgParamBean) o2).getArgI1())
    
    • 1

    五、组合型操作符

    1、startWith ( 添加头部数据)

    在发射的数据头部添加数据
    在这里插入图片描述

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            ApiData apiData = new ApiData("小紫", 8, 4);
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .startWith(new Observable<ApiData>() {
                        @Override
                        protected void subscribeActual(Observer<? super ApiData> observer) {
                            Log.d(TAG, "中游 : 开始发射...");
                            observer.onNext(new ApiData("小白", 8, 4));
                            observer.onComplete();
                            Log.d(TAG, "中游 : 发射完成");
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<ApiData>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(ApiData apiData) {
                            Log.d(TAG, "下游 onNext ="+apiData);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "下游 onComplete");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 开始发射...
    2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 发射完成
    2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 14:45:08.245 26288-26476/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小白, age=8, className=4)
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小明, age=8, className=4)
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小黑, age=8, className=4)
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小绿, age=8, className=4)
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
    2022-08-23 14:45:08.261 26288-26288/com.yoshin.tspsdk D/RxJavaActivity: 下游 onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2、concatWith (添加尾部数据)

    在上面数据的尾部添加数据

            Observable
                    .create(new ObservableOnSubscribe<ApiData>() {
                        @Override
                        public void subscribe(ObservableEmitter<ApiData> emitter) throws Exception {
                            Log.d(TAG, "上游 subscribe: 开始发射...");
                            emitter.onNext(new ApiData("小明", 8, 4));
                            emitter.onNext(new ApiData("小黑", 8, 4));
                            emitter.onNext(new ApiData("小绿", 8, 4));
                            ApiData apiData = new ApiData("小紫", 8, 4);
                            emitter.onNext(apiData);
                            emitter.onNext(apiData);
                            emitter.onComplete(); // 发射完成
                            // 上游的最后log才会打印
                            Log.d(TAG, "上游 subscribe: 发射完成");
                        }
                    })
                    .concatWith(new Observable<ApiData>() {
                        @Override
                        protected void subscribeActual(Observer<? super ApiData> observer) {
                            Log.d(TAG, "中游 : 开始发射...");
                            observer.onNext(new ApiData("小白", 8, 4));
                            observer.onComplete();
                            Log.d(TAG, "中游 : 发射完成");
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<ApiData>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(ApiData apiData) {
                            Log.d(TAG, "下游 onNext ="+apiData);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "下游 onComplete");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    2022-08-23 14:48:10.897 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 开始发射...
    2022-08-23 14:48:10.898 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 上游 subscribe: 发射完成
    2022-08-23 14:48:10.898 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 开始发射...
    2022-08-23 14:48:10.898 26795-27116/com.yoshin.tspsdk D/RxJavaActivity: 中游 : 发射完成
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小明, age=8, className=4)
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小黑, age=8, className=4)
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小绿, age=8, className=4)
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小紫, age=8, className=4)
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onNext =ApiData(name=小白, age=8, className=4)
    2022-08-23 14:48:10.914 26795-26795/com.yoshin.tspsdk D/RxJavaActivity: 下游 onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3、concat

    在这里插入图片描述
    concat(串行连接数据)
    concatDelayError
    concatEager(并行连接数据)

    作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行(后一个必须等前一个执行完之后)执行
    二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个。

    3.1、concat(串行连接数据)

            List<Observable<Integer>> list = new ArrayList<>();
            list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
            list.add(Observable.just(2));
            list.add(Observable.just(3));
            list.add(Observable.just(4));
            Observable.concat(list)
                    .subscribeOn(Schedulers.io())
                    .subscribe(i -> {
                Log.d(TAG, "下游 i==" + new Gson().toJson(i));
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    2022-08-23 16:26:51.384 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==1
    2022-08-23 16:26:51.385 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==2
    2022-08-23 16:26:51.385 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==3
    2022-08-23 16:26:51.385 10589-10772/com.yoshin.tspsdk D/RxJavaActivity: 下游 list==4
    
    • 1
    • 2
    • 3
    • 4

    3.2、concatDelayError(串行连接数据+错误处理)

    当数据源发生错误时,concatDelayError允许继续连接其他数据源,并且在最后,进入onError

            List<Observable<Integer>> list = new ArrayList<>();
            list.add(Observable.just(1).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept: NullPointerException");
                    throw new NullPointerException();
                }
            }));
            list.add(Observable.just(2).delay(1000, TimeUnit.MILLISECONDS));
            list.add(Observable.just(3));
            list.add(Observable.just(4));
            Observable.concatDelayError(list).subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    2022-08-23 16:34:24.012 12137-12137/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-23 16:34:24.012 12137-12137/com.yoshin.tspsdk E/RxJavaActivity: accept: NullPointerException
    2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
    2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
    2022-08-23 16:34:25.014 12137-12408/com.yoshin.tspsdk D/RxJavaActivity: onError: throw with null exception
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.3、concatEager(并行连接数据)

    等待延迟结束在调用

            Log.i(TAG, " ===============  concatEager  ============= ");
    
            List<Observable<Integer>> list = new ArrayList<>();
            list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
            list.add(Observable.just(2));
            list.add(Observable.just(3));
            list.add(Observable.just(4));
            Observable.concatEager(list)
                    .subscribeOn(Schedulers.io())
                    .subscribe(i -> {
                Log.d(TAG, "下游 i==" + new Gson().toJson(i));
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    2022-08-23 16:42:09.052 13375-13375/com.yoshin.tspsdk I/RxJavaActivity:  ===============  concatEager  ============= 
    2022-08-23 16:42:14.100 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==1
    2022-08-23 16:42:14.101 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==2
    2022-08-23 16:42:14.101 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==3
    2022-08-23 16:42:14.101 13375-13517/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.4、concatArrayEagerDelayError(并行连接数据+错误处理)

            Observable.concatArrayEagerDelayError(
                            Observable.just(1).doOnNext(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.e(TAG, "accept: NullPointerException");
                                    throw new NullPointerException();
                                }
                            }),
                            Observable.just(2),
                            Observable.just(3),
                            Observable.just(4))
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    2022-08-23 16:46:48.429 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk E/RxJavaActivity: accept: NullPointerException
    2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
    2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-23 16:46:48.430 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
    2022-08-23 16:46:48.431 14213-14213/com.yoshin.tspsdk D/RxJavaActivity: onError: throw with null exception
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4、merge (并行发射)

    在这里插入图片描述
    作用:组合多个被观察者一起发送数据,合并后 按时间线并行执行
    二者区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个。

    concatEager 和 merge 的区别是,concatEager 会等待Observable结果,结果同时发射;merge 是Observable同时调用

    4.1、merge

            List<Observable<Integer>> list = new ArrayList<>();
            list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
            list.add(Observable.just(2));
            list.add(Observable.just(3));
            list.add(Observable.just(4));
            Observable.concatEager(list)
                    .subscribeOn(Schedulers.io())
                    .subscribe(i -> {
                Log.d(TAG, "下游 i==" + new Gson().toJson(i));
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    2022-08-23 17:06:32.161 15467-17514/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==2
    2022-08-23 17:06:32.162 15467-17514/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==3
    2022-08-23 17:06:32.162 15467-17514/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==4
    2022-08-23 17:06:37.140 15467-17515/com.yoshin.tspsdk D/RxJavaActivity: 下游 i==1
    
    • 1
    • 2
    • 3
    • 4

    4.2、mergeArrayDelayError

            Observable.mergeArrayDelayError(
                            Observable.just(1).doOnNext(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.e(TAG, "accept: NullPointerException");
                                    throw new NullPointerException();
                                }
                            }).delay(1000, TimeUnit.MILLISECONDS),
                            Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
                            Observable.just(3),
                            Observable.just(4))
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    2022-08-23 17:39:13.906 22123-22123/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-23 17:39:13.907 22123-22123/com.yoshin.tspsdk E/RxJavaActivity: accept: NullPointerException
    2022-08-23 17:39:13.908 22123-22123/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-23 17:39:13.908 22123-22123/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
    2022-08-23 17:39:14.909 22123-22306/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
    2022-08-23 17:39:14.909 22123-22306/com.yoshin.tspsdk D/RxJavaActivity: onError: throw with null exception
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    5、zip (合并)

    在这里插入图片描述
    zip是在其中一个Observable发射数据项后,组合所有Observable最早一个未被组合的数据项,也就是说,组合后的Observable发射的第n个数据项,必然是每个源由Observable各自发射的第n个数据项构成的

            Observable.zip(
                            Observable.just("小黑", "小红", "小x").delay(1000, TimeUnit.MILLISECONDS),
                            Observable.just(2,12).delay(1000, TimeUnit.MILLISECONDS),
                            new BiFunction<String, Integer, String>() {
                                @Override
                                public String apply(String s, Integer integer) throws Exception {
                                    Log.d(TAG, "apply: ");
                                    return s + integer;
                                }
                            }
                    )
                    .subscribe(new Observer<String>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.d(TAG, "onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    2022-08-23 18:12:24.014 28797-28797/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: apply: 
    2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小黑2
    2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: apply: 
    2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小红12
    2022-08-23 18:12:25.016 28797-28967/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    6、combineLatest (合并)

    在这里插入图片描述

    combineLatest则是在其中一个Observable发射数据项后,组合所有Observable所发射的最后一个数据项(前提是所有的Observable都至少发射过一个数据项)

    6.1、combineLatest

    合并处理数据

                    Observable.combineLatest(
                            Observable.just("小黑", "小红", "小x").delay(1000, TimeUnit.MILLISECONDS),
                            Observable.just(2,12),
                            new BiFunction<String, Integer, String>() {
                                @Override
                                public String apply(String s, Integer integer) throws Exception {
                                    Log.d(TAG, "apply: ");
                                    return s + integer;
                                }
                            }
                    )
                    .subscribe(new Observer<String>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.d(TAG, "onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    2022-08-24 10:55:36.735 14510-14510/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 10:55:37.738 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: apply: 
    2022-08-24 10:55:37.739 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小黑12
    2022-08-24 10:55:37.744 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: apply: 
    2022-08-24 10:55:37.745 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小红12
    2022-08-24 10:55:37.749 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: apply: 
    2022-08-24 10:55:37.750 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onNext: 小x12
    2022-08-24 10:55:37.751 14510-14667/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    6.2、combineLatestDelayError

    合并下发数据并且 延迟处理错误

       Observable<Integer> observable2;
            Observable<String> observable3;
            
            Observable.combineLatestDelayError(
                            new ObservableSource[]{
                                    Observable.just(2, 3),
                                    observable2=  Observable.just(4, 3),
                                    observable3 =  Observable.create(new ObservableOnSubscribe<String>() {
                                        @Override
                                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                            Log.e(TAG, "subscribe: ");
                                            emitter.onNext("特别黑");
                                            emitter.onError(new Throwable("这是个Throwable"));
                                        }
                                    })
                            },
                            new Function<Object[], String>() {
                                @Override
                                public String apply(Object[] objects) throws Exception {
                                    Log.d(TAG, "apply: " + new Gson().toJson(objects));
                                    return String.valueOf(objects[0])  +objects[1] + objects[2];
                                }
                            }
                    )
                    .subscribe(new Observer<String>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.d(TAG, "onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    2022-08-24 15:14:04.732 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 15:14:04.733 23925-23925/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 15:14:04.745 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,3,"特别黑"]
    2022-08-24 15:14:04.745 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: onNext: 33特别黑
    2022-08-24 15:14:04.745 23925-23925/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是个Throwable
    
    • 1
    • 2
    • 3
    • 4
    • 5

    步骤二:如果 observable2 与 observable3 互换位置:

    2022-08-24 16:20:16.394 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 16:20:16.394 29025-29025/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 16:20:16.396 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",4]
    2022-08-24 16:20:16.396 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑4
    2022-08-24 16:20:16.397 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",3]
    2022-08-24 16:20:16.397 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑3
    2022-08-24 16:20:16.397 29025-29025/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是个Throwable
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    步骤二:修改 observable3

                                    observable3 =  Observable.create(new ObservableOnSubscribe<String>() {
                                        @Override
                                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                            Log.e(TAG, "subscribe: ");
                                            emitter.onNext("特别黑");
                                            emitter.onError(new Throwable("这是个Throwable"));
                                            emitter.onNext("特别白");
                                        }
                                    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    2022-08-24 16:23:29.470 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 16:23:29.471 29787-29787/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 16:23:29.482 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",4]
    2022-08-24 16:23:29.483 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑4
    2022-08-24 16:23:29.484 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: apply: [3,"特别黑",3]
    2022-08-24 16:23:29.484 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3特别黑3
    2022-08-24 16:23:29.484 29787-29787/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是个Throwable
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    7、amb (胜者为王)

    给定多个Observable,只让第一个发射数据的Observable发射全部数据

    7.1、amb

      List<Observable<Integer>> list = new ArrayList<>();
            list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                }
            }));
            list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(10);
                    emitter.onNext(20);
                    emitter.onNext(30);
                }
            }).delay(300, TimeUnit.MILLISECONDS));
            Observable.amb(list)
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer s) {
                            Log.d(TAG, "onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    2022-08-25 14:40:28.394 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-25 14:40:28.395 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onNext: 1
    2022-08-25 14:40:28.395 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
    2022-08-25 14:40:28.395 10057-10057/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    
    • 1
    • 2
    • 3
    • 4

    7.2、ambArray

            Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
                    .concatMap(aLong -> Observable.just(1, 2, 3, 4, 5));
    
            Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
                    .flatMap(aLong -> Observable.just(6, 7, 8, 9, 10));
    
            Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
                    .flatMap(aLong -> Observable.just(11, 12, 13, 14, 15));
    
            Observable.ambArray(observable1, observable2, observable3)
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer s) {
                            Log.d(TAG, "onNext: " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 11
    2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 12
    2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 13
    2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 14
    2022-08-25 14:46:13.749 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onNext: 15
    2022-08-25 14:46:13.750 10655-10899/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    8、collect (包装成数据结构)

    收集发射的数据到一个数据结构里,然后将这个结构作为一个整体发射出去。

            Observable.just(1, 2, 3, 4)
                    .collect(
                            new Callable<List<Integer>>() {
                                @Override
                                public List<Integer> call() throws Exception {
                                    return new ArrayList<>(); // 这个地方的数据结构可变,跟业务有关
                                }
                            }, new BiConsumer<List<Integer>, Integer>() {
                                @Override
                                public void accept(List<Integer> integerList, Integer integer2) throws Exception {
                                    integerList.add(integer2);
                                }
                            })
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> integerList) throws Exception {
                            Log.d(TAG, "accept: " + new Gson().toJson(integerList));
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    2022-08-26 14:52:12.435 28902-28902/com.yoshin.tspsdk D/RxJavaActivity: accept: [1,2,3,4]
    
    • 1

    9、count (统计)

            Observable.just(1, 2, 3, 4)
                    .count()
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.d(TAG, "accept: "+aLong);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    2022-08-26 15:05:18.635 29618-29618/com.yoshin.tspsdk D/RxJavaActivity: accept: 4
    
    • 1

    六、异常处理操作符

    1、onErrorReturn (拦截异常及错误并返回定义信息)

    让Observable遇到错误(Throwable、Error、Exception)时发射一个特殊的项并且正常终止。

            Observable.merge(
                            Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
                            Observable.create(new ObservableOnSubscribe<Integer>() {
                                @Override
                                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                                    Log.e(TAG, "subscribe: ");
                                    emitter.onNext(3);
    //                                throw new NullPointerException("异常");//拦截
                                    emitter.onError(new Throwable("这是Throwable")); // 拦截
    //                                emitter.onError(new Exception("测试错误")); //拦截
    //                                emitter.onError(new Error("测试错误")); // 拦截
                                }
                            }),
                            Observable.just(4))
                    .onErrorReturn(new Function<Throwable, Integer>() {
                        @Override
                        public Integer apply(Throwable throwable) throws Exception {
                            return -1;
                        }
                    })
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    2022-08-24 10:35:27.221 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 10:35:27.222 12231-12231/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 10:35:27.222 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 10:35:27.223 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onNext: -1
    2022-08-24 10:35:27.223 12231-12231/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    emitter.onError(new Throwable()); 或者 emitter.onError(new Error());
    throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception());
    所有异常都拦截

    2、onErrorResumeNext (拦截异常及错误)

    让Observable在遇到错误(Throwable、Error、Exception)时开始发射第二个Observable的数据序列。

            Observable.merge(
                            Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
                            Observable.create(new ObservableOnSubscribe<Integer>() {
                                @Override
                                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                                    Log.e(TAG, "subscribe: ");
                                    emitter.onNext(3);
    //                                throw new NullPointerException("异常");//拦截
                                    emitter.onError(new Throwable("这是Throwable")); // 拦截
    //                                emitter.onError(new Exception("测试错误")); //拦截
    //                                emitter.onError(new Error("测试错误")); // 拦截
                                }
                            }),
                            Observable.just(4))
                    .onErrorResumeNext(new Observable<Integer>() {
                        @Override
                        protected void subscribeActual(Observer<? super Integer> observer) {
                            Log.e(TAG, "subscribeActual: ");
                            observer.onNext(110);
                            observer.onNext(120);
                            observer.onComplete();
                        }
                    })
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    2022-08-24 10:31:15.082 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk E/RxJavaActivity: subscribeActual: 
    2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onNext: 110
    2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onNext: 120
    2022-08-24 10:31:15.084 11854-11854/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    emitter.onError(new Throwable()); 或者 emitter.onError(new Error());
    throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception());
    所有异常都拦截

    3、onExceptionResumeNext (拦截异常)

    和onErrorResumeNext类似,
    onExceptionResumeNext方法返回一个镜像原有Observable行为的新Observable,
    也使用一个备用的Observable,

    不同的是,
    如果onError收到的Throwable不是一个Exception,
    它会将错误传递给观察者的onError方法,不会使用备用的Observable。

            Observable.merge(
                            Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
                            Observable.create(new ObservableOnSubscribe<Integer>() {
                                @Override
                                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                                    Log.e(TAG, "subscribe: ");
                                    emitter.onNext(3);
    //                                throw new NullPointerException("异常");//拦截
    //                                emitter.onError(new Throwable()); // 不拦截
    //                                emitter.onError(new Exception("测试错误")); //拦截
                                    emitter.onError(new Error("测试错误")); // 不拦截
                                }
                            }),
                            Observable.just(4))
                    .onExceptionResumeNext(new Observable<Integer>() {
                        @Override
                        protected void subscribeActual(Observer<? super Integer> observer) {
                            Log.e(TAG, "subscribeActual: ");
                            observer.onNext(110);
                            observer.onNext(120);
                            observer.onComplete();
                        }
                    })
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    emitter.onError(new Throwable()); 或者 emitter.onError(new Error()); ,不拦截

    2022-08-24 09:52:24.703 8455-8455/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 09:52:24.704 8455-8455/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 09:52:24.705 8455-8455/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 09:52:24.705 8455-8455/com.yoshin.tspsdk D/RxJavaActivity: onError: 测试错误
    
    • 1
    • 2
    • 3
    • 4

    throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception()); ,拦截

    2022-08-24 09:54:23.602 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 09:54:23.604 8902-8902/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 09:54:23.604 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk E/RxJavaActivity: subscribeActual: 
    2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onNext: 110
    2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onNext: 120
    2022-08-24 09:54:23.605 8902-8902/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4、retry (异常重试)

            Observable.merge(
                            Observable.just(2).delay(1000, TimeUnit.MILLISECONDS),
                            Observable.create(new ObservableOnSubscribe<Integer>() {
                                @Override
                                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                                    Log.e(TAG, "subscribe: ");
                                    emitter.onNext(3);
    //                                throw new NullPointerException("异常");//拦截
    //                                emitter.onError(new Throwable("这是Throwable")); // 拦截
    //                                emitter.onError(new Exception("这是Exception")); //拦截
                                    emitter.onError(new Error("这是Error")); // 拦截
                                }
                            }),
                            Observable.just(4))
                    .retry(2, new Predicate<Throwable>() {
                        @Override
                        public boolean test(Throwable throwable) throws Exception {
                            Log.e(TAG, "test: " + throwable.getMessage());
                            return true;
                        }
                    })
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    2022-08-24 10:43:58.735 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-24 10:43:58.737 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 10:43:58.737 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 10:43:58.737 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: test: 这是Error
    2022-08-24 10:43:58.738 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 10:43:58.738 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 10:43:58.738 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: test: 这是Error
    2022-08-24 10:43:58.739 14008-14008/com.yoshin.tspsdk E/RxJavaActivity: subscribe: 
    2022-08-24 10:43:58.739 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-24 10:43:58.739 14008-14008/com.yoshin.tspsdk D/RxJavaActivity: onError: 这是Error
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    emitter.onError(new Throwable()); 或者 emitter.onError(new Error());
    throw new NullPointerException(“异常”); 或者 emitter.onError(new Exception());
    所有异常都拦截

    5、repeat (条件重试)

    在这里插入图片描述

    5.1、repeat (指定次数)

    如果repeat不带参数默认无限循环

            Observable.just("小黑", "真帅")
                    .repeat(2)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "accept: " + s);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
    2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
    2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
    2022-08-25 15:10:09.666 11747-11747/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
    
    • 1
    • 2
    • 3
    • 4

    5.2、repeatWhen (指定时间)

    设置下次执行的时间,这种方式只能执行两次

            Observable.just("小黑", "真帅")
                    .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                            return Observable.timer(1000, TimeUnit.MILLISECONDS);
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "accept: " + s);
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    2022-08-25 15:16:34.581 12559-12559/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
    2022-08-25 15:16:34.581 12559-12559/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
    2022-08-25 15:16:35.583 12559-12721/com.yoshin.tspsdk D/RxJavaActivity: accept: 小黑
    2022-08-25 15:16:35.584 12559-12721/com.yoshin.tspsdk D/RxJavaActivity: accept: 真帅
    
    • 1
    • 2
    • 3
    • 4

    5.2、repeatUntil(指定条件)

            Observable.just("小黑", "真帅")
                    .repeatUntil(new BooleanSupplier() {
                        @Override
                        public boolean getAsBoolean() throws Exception {
                            // true时则立即中断执行 
                            return false;
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "accept: " + s);
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    七、辅助操作符

    1、delay (延时)

    在这里插入图片描述

     Observable.just(4, 3).delay(1000, TimeUnit.MILLISECONDS)
    
    • 1

    2、do

    do系列操作符相当于给Observable执行周期的关键节点添加回调。
    当Observable执行到这个阶段的时候,这些回调就会【先被触发】。
    当Observable每发送一个数据时,doOnNext会被首先调用,然后再onNext。
    若发射中途出现异常doOnError会被调用,然后onError。
    若数据正常发送完毕doOnCompleted会被触发,然后执行onCompleted。
    当订阅或者解除订阅doOnSubscribe,doOnUnsubscribe会被执行。

    (1)doOnSubscribe:在被观察者和观察者产生关联的时候被调用,disposable可以立即取消订阅;
    (2)doOnLifecycle:可以在订阅之后设置是否取消订阅;
    (3)doNext和doAfterNext:可以接收到被观察者发射过来的数据;
    (4)doOnEach:当onNext、onError、onComplete被触发是被调用;
    (5)doOnComplete:当触发onComplete时被调用;
    (6)doOnError:当触发onError时被调用;
    (7)doFinally:当触发onError或onComplete时被调用;
    (8)doOnDispose:当取消订阅时被调用;
    (9)doAfterTerminate:订阅终止时被调用;

            Observable.just(1, 2, 3)
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            Log.d(TAG, "accept: doOnSubscribe");
                        }
                    })
                    .doOnDispose(new Action() {//当取消订阅时被调用;
                        @Override
                        public void run() throws Exception {
                            Log.d(TAG, "run: doOnDispose");
                        }
                    })
                    .doOnTerminate(new Action() {//订阅终止时被调用;
                        @Override
                        public void run() throws Exception {
                            Log.d(TAG, "run: doOnTerminate");
                        }
                    })
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() throws Exception {
                            Log.d(TAG, "run: doAfterTerminate");
                        }
                    })
                    .doOnLifecycle(
                            new Consumer<Disposable>() {
                                @Override
                                public void accept(Disposable disposable) throws Exception {
                                    Log.d(TAG, "accept: doOnLifecycle");
                                }
                            }, new Action() {
                                @Override
                                public void run() throws Exception {
                                    Log.d(TAG, "run: doOnLifecycle");
                                }
                            })
                    .doOnNext(new Consumer<Integer>() {//可以接收到被观察者发射过来的数据之前;
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: doOnNext ==" + integer);
                        }
                    })
                    .doAfterNext(new Consumer<Integer>() {//可以接收到被观察者发射过来的数据之后;
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: doAfterNext ==" + integer);
                        }
                    })
                    .doOnComplete(new Action() {//当触发onComplete时被调用;
                        @Override
                        public void run() throws Exception {
                            Log.d(TAG, "run: doOnComplete");
                        }
                    })
                    .doOnError(new Consumer<Throwable>() {
                        //当触发onError时被调用;
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.d(TAG, "run: doOnError");
                        }
                    })
                    .doFinally(new Action() {
                        //当触发onError或onComplete时被调用;
                        @Override
                        public void run() throws Exception {
                            Log.d(TAG, "run: doFinally");
                        }
                    })
                    .doOnEach(new Consumer<Notification<Integer>>() {
                        //当onNext、onError、onComplete被触发时被调用;
                        @Override
                        public void accept(Notification<Integer> integerNotification) throws Exception {
                            Log.d(TAG, "run: doOnEach ==" + integerNotification.getValue());
                        }
                    })
                    //**************************************************************
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: ");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnSubscribe
    2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnLifecycle
    2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnNext ==1
    2022-08-26 15:55:18.976 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==1
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onNext: 1
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doAfterNext ==1
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnNext ==2
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==2
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doAfterNext ==2
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doOnNext ==3
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==3
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: accept: doAfterNext ==3
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnTerminate
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnComplete
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doOnEach ==null
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: onComplete: 
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doFinally
    2022-08-26 15:55:18.977 32664-32664/com.yoshin.tspsdk D/RxJavaActivity: run: doAfterTerminate
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    转:RxJava : do操作符
    https://blog.csdn.net/sinat_31057219/article/details/101374255

    3、timeout (超时)

    如果从其前任开始的指定超时时间内未发出下一项,则生成的 Observable 将终止并通知观察者 TimeoutException。
    在这里插入图片描述

            Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws InterruptedException {
                            emitter.onNext(1);
                            Thread.sleep(400);
                            emitter.onNext(2);
                            Thread.sleep(400);
                            emitter.onNext(3);
                            Thread.sleep(900);
                            emitter.onNext(4);
                            Thread.sleep(1100);
                            emitter.onNext(66);
                        }
                    })
                    .observeOn(Schedulers.io())
                    .timeout(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    2022-08-26 16:07:42.458 3341-3341/com.yoshin.tspsdk D/RxJavaActivity: onSubscribe: 
    2022-08-26 16:07:42.459 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 1
    2022-08-26 16:07:42.860 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 2
    2022-08-26 16:07:43.261 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 3
    2022-08-26 16:07:44.161 3341-3553/com.yoshin.tspsdk D/RxJavaActivity: onNext: 4
    2022-08-26 16:07:45.166 3341-3563/com.yoshin.tspsdk D/RxJavaActivity: onError: The source did not signal an event for 1000 milliseconds and has been terminated.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    参考地址

    中文文档 :https://www.kancloud.cn/luponu/rxjava_zh/974452

    http://reactivex.io/RxJava/2.x/javadoc/

    rxjava:错误处理操作符(2): onErrorReturn 、 onErrorResumeNext 、onExceptionResumeNext https://blog.csdn.net/sinat_31057219/article/details/101304867

    RxJava2操作符总结 – 你想要的都在这里了:https://www.jianshu.com/p/02c83de487e3

    RxJava 过滤操作符 throttleFirst 与 throttleLast 以及 sample:https://blog.csdn.net/qq_33210042/article/details/103352275

  • 相关阅读:
    Python的Pandas库(一)基础使用
    Leetcode 2981
    LabVIEW应用开发——控件的使用(三)
    HTTP2 协议长文详解
    C++ 数组
    Go 语言特性与设计哲学
    Electron_基础篇
    浅谈现货伦敦金分析的经验
    【web前端】web前端设计入门到实战第二弹——面试题总结+答案
    新华三的千亿企业梦,还得靠吃ICT老本来实现?
  • 原文地址:https://blog.csdn.net/weixin_35691921/article/details/88565900