• RxJava操作符


    操作符

    操作符是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件。
    RxJava提供了很多很有用的操作符。

    Subscribers更应该做的事情是“响应”,响应Observable发出的事件,而不是去修改。

    创建操作

    以下操作符用于创建Observable。

    • create: 使用OnSubscribe从头创建一个Observable,这种方法比较简单。需要注意的是,使用该方法创建时,建议在OnSubscribe#call方法中检查订阅状态,以便及时停止发射数据或者运算。

            Observable.create(new Observable.OnSubscribe() {
      
                @Override
                public void call(Subscriber subscriber) {
      
                    subscriber.onNext("item1");
                    subscriber.onNext("item2");
                    subscriber.onCompleted();
                }
            });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    • from: 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable。Future转换为OnSubscribe是通过OnSubscribeToObservableFuture进行的,Iterable转换通过OnSubscribeFromIterable进行。数组通过OnSubscribeFromArray转换。

    Observable#from

          //Iterable
          List list=new ArrayList<>();
          ...
          Observable.from(list)
                  .subscribe(new Action1() {
              @Override
              public void call(String s) {
    
              }
          });
    
          //Future
           Future futrue= Executors.newSingleThreadExecutor().submit(new Callable() {
    
              @Override
              public String call() throws Exception {
                  Thread.sleep(1000);
                  return "maplejaw";
              }
          });
    
          Observable.from(futrue)
                    .subscribe(new Action1() {
              @Override
              public void call(String s) {
    
              }
          });
    ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • just: 将一个或多个对象转换成发射这个或这些对象的一个Observable。如果是单个对象,内部创建的是ScalarSynchronousObservable对象。如果是多个对象,则是调用了from方法创建。

    • empty: 创建一个什么都不做直接通知完成的Observable

    • error: 创建一个什么都不做直接通知错误的Observable

    • never: 创建一个什么都不做的Observable

            Observable observable1=Observable.empty();//直接调用onCompleted。
            Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
            Observable observable3=Observable.never();//啥都不做
      
      • 1
      • 2
      • 3
    • timer: 创建一个在给定的延时之后发射数据项为0的Observable,内部通过OnSubscribeTimerOnce工作

         Observable.timer(1000,TimeUnit.MILLISECONDS)
                    .subscribe(new Action1() {
                        @Override
                        public void call(Long aLong) {
                            Log.d("JG",aLong.toString()); // 0
                        }
                    });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • interval: 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable,内部通过OnSubscribeTimerPeriodically工作。

          Observable.interval(1, TimeUnit.SECONDS)
                    .subscribe(new Action1() {
                        @Override
                        public void call(Long aLong) {
                             //每隔1秒发送数据项,从0开始计数
                             //0,1,2,3....
                        }
                    });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • range: 创建一个发射指定范围的整数序列的Observable

         Observable.range(2,5).subscribe(new Action1() {
                @Override
                public void call(Integer integer) {
                    Log.d("JG",integer.toString());// 2,3,4,5,6 从2开始发射5个数据
                }
            });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • defer: 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。

          Observable.defer(new Func0>() {
                @Override
                public Observable call() {
                    return Observable.just("hello");
                }
            }).subscribe(new Action1() {
                @Override
                public void call(String s) {
                    Log.d("JG",s);
                }
            });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

    合并操作

    以下操作符用于组合多个Observable。

    • concat: 按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)

            Observable observable1=Observable.just(1,2,3,4);
            Observable  observable2=Observable.just(4,5,6);
      
            Observable.concat(observable1,observable2)
                    .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • startWith: 在数据序列的开头增加一项数据。startWith的内部也是调用了concat

         Observable.just(1,2,3,4,5)
                    .startWith(6,7,8)
            .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
      
      • 1
      • 2
      • 3
    • merge: 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-O58EBDS1-1660454001236)(http://upload-images.jianshu.io/upload_images/1931185-57219b8f15ee8a93.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    merge流程图

    • zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合。

        Observable  observable1=Observable.just(1,2,3,4);
        Observable  observable2=Observable.just(4,5,6);
            Observable.zip(observable1, observable2, new Func2() {
                @Override
                public String call(Integer item1, Integer item2) {
                    return item1+"and"+item2;
                }
            })
            .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • combineLatest: 。当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合。具体请看下面流程图。
      zip工作流程

    zip流程图

    combineLatest工作流程

    combineLatest流程

    过滤操作

    • filter: 过滤数据。内部通过OnSubscribeFilter过滤数据。

          Observable.just(3,4,5,6)
                    .filter(new Func1() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer>4;
                        }
                    })
            .subscribe(item->Log.d("JG",item.toString())); //5,6
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • ofType: 过滤指定类型的数据,与filter类似,

        Observable.just(1,2,"3")
                    .ofType(Integer.class)
                    .subscribe(item -> Log.d("JG",item.toString()));
      
      • 1
      • 2
      • 3
    • take: 只发射开始的N项数据或者一定时间内的数据。内部通过OperatorTakeOperatorTakeTimed过滤数据。

          Observable.just(3,4,5,6)
                    .take(3)//发射前三个数据项
                    .take(100, TimeUnit.MILLISECONDS)//发射100ms内的数据
      
      • 1
      • 2
      • 3
    • takeLast: 只发射最后的N项数据或者一定时间内的数据。内部通过OperatorTakeLastOperatorTakeLastTimed过滤数据。takeLastBuffer和takeLast类似,不同点在于takeLastBuffer会收集成List后发射。

         Observable.just(3,4,5,6)
                    .takeLast(3)
                    .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
      
      • 1
      • 2
      • 3
    • takeFirst:提取满足条件的第一项。内部实现源码如下:

        public final Observable takeFirst(Func1 predicate) {
              return filter(predicate).take(1); //先过滤,后提取
        }
      
      • 1
      • 2
      • 3
    • first/firstOrDefault:只发射第一项(或者满足某个条件的第一项)数据,可以指定默认值。

         Observable.just(3,4,5,6)
                    .first()
                    .subscribe(integer -> Log.d("JG",integer.toString()));//3
      
            Observable.just(3,4,5,6)
                       .first(new Func1() {
                           @Override
                           public Boolean call(Integer integer) {
                               return integer>3;
                           }
                       }) .subscribe(integer -> Log.d("JG",integer.toString()));//4
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • last/lastOrDefault:只发射最后一项(或者满足某个条件的最后一项)数据,可以指定默认值。

    • skip:跳过开始的N项数据或者一定时间内的数据。内部通过OperatorSkipOperatorSkipTimed实现过滤。

          Observable.just(3,4,5,6)
                       .skip(1)
                    .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
      
      • 1
      • 2
      • 3
    • skipLast:跳过最后的N项数据或者一定时间内的数据。内部通过OperatorSkipLastOperatorSkipLastTimed实现过滤。

    • elementAt/elementAtOrDefault:发射某一项数据,如果超过了范围可以的指定默认值。内部通过OperatorElementAt过滤。

            Observable.just(3,4,5,6)
                     .elementAt(2)
            .subscribe(item->Log.d("JG",item.toString())); //5
      
      • 1
      • 2
      • 3
    • ignoreElements:丢弃所有数据,只发射错误或正常终止的通知。内部通过OperatorIgnoreElements实现。

    • distinct:过滤重复数据,内部通过OperatorDistinct实现。

         Observable.just(3,4,5,6,3,3,4,9)
               .distinct()
              .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,9
      
      • 1
      • 2
      • 3
    • distinctUntilChanged:过滤掉连续重复的数据。内部通过OperatorDistinctUntilChanged实现

         Observable.just(3,4,5,6,3,3,4,9)
               .distinctUntilChanged()
              .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
      
      • 1
      • 2
      • 3
    • throttleFirst:定期发射Observable发射的第一项数据。内部通过OperatorThrottleFirst实现。

        Observable.create(subscriber -> {
                subscriber.onNext(1);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(2);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
      
                subscriber.onNext(3);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onCompleted();
      
            }).throttleFirst(999, TimeUnit.MILLISECONDS)
                    .subscribe(item-> Log.d("JG",item.toString())); //结果为1,3,4
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    • throttleWithTimeout/debounce:发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时
      才进行发射

        Observable.create(subscriber -> {
                subscriber.onNext(1);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(2);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
      
                subscriber.onNext(3);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onCompleted();
      
            }).debounce(999, TimeUnit.MILLISECONDS)//或者为throttleWithTimeout(1000, TimeUnit.MILLISECONDS)
                    .subscribe(item-> Log.d("JG",item.toString())); //结果为3,5
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    • sample/throttleLast:定期发射Observable最近的数据。内部通过OperatorSampleWithTime实现。

         Observable.create(subscriber -> {
                subscriber.onNext(1);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(2);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
      
                subscriber.onNext(3);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onCompleted();
      
            }).sample(999, TimeUnit.MILLISECONDS)//或者为throttleLast(1000, TimeUnit.MILLISECONDS)
                    .subscribe(item-> Log.d("JG",item.toString())); //结果为2,3,5
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    • timeout: 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable。

           Observable.create(( subscriber) -> {
                subscriber.onNext(1);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
                subscriber.onNext(2);
      
                subscriber.onCompleted();
      
            }).timeout(999, TimeUnit.MILLISECONDS,Observable.just(99,100))//如果不指定备用Observable将会抛出异常
                    .subscribe(item-> Log.d("JG",item.toString()),error->Log.d("JG","onError")); //结果为1,99,100  如果不指定备用Observable结果为1,onError
        }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

    条件/布尔操作

    • all: 判断所有的数据项是否满足某个条件,内部通过OperatorAll实现。

          Observable.just(2,3,4,5)
                    .all(new Func1() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer>3;
                        }
                    })
            .subscribe(new Action1() {
                @Override
                public void call(Boolean aBoolean) {
                    Log.d("JG",aBoolean.toString()); //false
                }
            })
            ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    • exists: 判断是否存在数据项满足某个条件。内部通过OperatorAny实现。

           Observable.just(2,3,4,5)
                    .exists(integer -> integer>3)
                    .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
      
      • 1
      • 2
      • 3
    • contains: 判断在发射的所有数据项中是否包含指定的数据,内部调用的其实是exists

          Observable.just(2,3,4,5)
                    .contains(3)
                    .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
      
      • 1
      • 2
      • 3
    • sequenceEqual: 用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)。

         Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5))
                    .subscribe(aBoolean -> Log.d("JG",aBoolean.toString()));//true
      
      • 1
      • 2
    • isEmpty: 用于判断Observable发射完毕时,有没有发射数据。有数据false,如果只收到了onComplete通知则为true。

          Observable.just(3,4,5,6)
                       .isEmpty()
                      .subscribe(item -> Log.d("JG",item.toString()));//false
      
      • 1
      • 2
      • 3
    • amb: 给定多个Observable,只让第一个发射数据的Observable发射全部数据,其他Observable将会被忽略。

            Observable observable1=Observable.create(new Observable.OnSubscribe() {
                @Override
                public void call(Subscriber subscriber) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        subscriber.onError(e);
                    }
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.computation());
      
            Observable observable2=Observable.create(subscriber -> {
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onCompleted();
            });
      
            Observable.amb(observable1,observable2)
            .subscribe(integer -> Log.d("JG",integer.toString())); //3,4
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
    • switchIfEmpty: 如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。

          Observable.empty()
                   .switchIfEmpty(Observable.just(2,3,4))
           .subscribe(o -> Log.d("JG",o.toString())); //2,3,4
      
      • 1
      • 2
      • 3
    • defaultIfEmpty: 如果原始Observable正常终止后仍然没有发射任何数据,就发射一个默认值,内部调用的switchIfEmpty。

    • takeUntil: 当发射的数据满足某个条件后(包含该数据),或者第二个Observable发送完毕,终止第一个Observable发送数据。

         Observable.just(2,3,4,5)
                    .takeUntil(new Func1() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer==4;
                        }
                    }).subscribe(integer -> Log.d("JG",integer.toString())); //2,3,4
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • takeWhile: 当发射的数据满足某个条件时(不包含该数据),Observable终止发送数据。

          Observable.just(2,3,4,5)
                    .takeWhile(new Func1() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer==4;
                        }
                    })
                    .subscribe(integer -> Log.d("JG",integer.toString())); //2,3
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • skipUntil: 丢弃Observable发射的数据,直到第二个Observable发送数据。(丢弃条件数据)

    • skipWhile: 丢弃Observable发射的数据,直到一个指定的条件不成立(不丢弃条件数据)

    聚合操作

    • reduce: 对序列使用reduce()函数并发射最终的结果,内部使用OnSubscribeReduce实现。

          Observable.just(2,3,4,5)
                    .reduce(new Func2() {
                        @Override
                        public Integer call(Integer sum, Integer item) {
                            return sum+item;
                        }
                    })
                    .subscribe(integer -> Log.d("JG",integer.toString()));//14
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • collect: 使用collect收集数据到一个可变的数据结构。

          Observable.just(3,4,5,6)
                       .collect(new Func0>() { //创建数据结构
      
                           @Override
                           public List call() {
                               return new ArrayList();
                           }
                       }, new Action2, Integer>() { //收集器
                           @Override
                           public void call(List integers, Integer integer) {
                               integers.add(integer);
                           }
                       })
                      .subscribe(new Action1>() {
                          @Override
                          public void call(List integers) {
      
                          }
                      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • count/countLong: 计算发射的数量,内部调用的是reduce.

    转换操作

    • toList: 收集原始Observable发射的所有数据到一个列表,然后返回这个列表.

            Observable.just(2,3,4,5)
                    .toList()
                    .subscribe(new Action1>() {
                        @Override
                        public void call(List integers) {
      
                        }
                    });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • toSortedList: 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表。

           Observable.just(6,2,3,4,5)
                    .toSortedList(new Func2() {//自定义排序
                        @Override
                        public Integer call(Integer integer, Integer integer2) {
                            return integer-integer2; //>0 升序 ,<0 降序
                        }
                    })
                    .subscribe(new Action1>() {
                        @Override
                        public void call(List integers) {
                            Log.d("JG",integers.toString()); // [2, 3, 4, 5, 6]
                        }
                    });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • toMap: 将序列数据转换为一个Map。我们可以根据数据项生成key和生成value。
      ```java

        Observable.just(6,2,3,4,5)
                .toMap(new Func1() {
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer; //根据数据项生成map的key
                    }
                }, new Func1() {
                    @Override
                    public String call(Integer integer) {
                        return "value:"+integer; //根据数据项生成map的kvalue
                    }
                }).subscribe(new Action1>() {
            @Override
            public void call(Map stringStringMap) {
                Log.d("JG",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
            }
        });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      
      
      • 1
    • toMultiMap: 类似于toMap,不同的地方在于map的value是一个集合。

    变换操作

    • map: 对Observable发射的每一项数据都应用一个函数来变换。

         Observable.just(6,2,3,4,5)
                    .map(integer -> "item:"+integer)
                    .subscribe(s -> Log.d("JG",s));//item:6,item:2....
      
      • 1
      • 2
      • 3
    • cast: 在发射之前强制将Observable发射的所有数据转换为指定类型

    • flatMap: 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。

               Observable.just(2,3,5)
                    .flatMap(new Func1>() {
                        @Override
                        public Observable call(Integer integer) {
                            return Observable.create(subscriber -> {
                                subscriber.onNext(integer*10+"");
                                subscriber.onNext(integer*100+"");
                                subscriber.onCompleted();
                            });
                        }
                    })
            .subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • flatMapIterable: 和flatMap的作用一样,只不过生成的是Iterable而不是Observable。

                Observable.just(2,3,5)
                    .flatMapIterable(new Func1>() {
                        @Override
                        public Iterable call(Integer integer) {
                            return Arrays.asList(integer*10+"",integer*100+"");
                        }
                    }).subscribe(new Action1() {
                      @Override
                      public void call(String s) {
      
                      }
            });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • concatMap: 类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射。

    • switchMap: 和flatMap很像,将Observable发射的数据变换为Observables集合,当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable。

          Observable.create(new Observable.OnSubscribe() {
      
                @Override
                public void call(Subscriber subscriber) {
                    for(int i=1;i<4;i++){
                        subscriber.onNext(i);
                        Utils.sleep(500,subscriber);//线程休眠500ms
                    }
      
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.newThread())
              .switchMap(new Func1>() {
                     @Override
                   public Observable call(Integer integer) {
                           //每当接收到新的数据,之前的Observable将会被取消订阅
                            return Observable.create(new Observable.OnSubscribe() {
                                @Override
                                public void call(Subscriber subscriber) {
                                    subscriber.onNext(integer*10);
                                    Utils.sleep(500,subscriber);
                                    subscriber.onNext(integer*100);
                                    subscriber.onCompleted();
                                }
                            }).subscribeOn(Schedulers.newThread());
                        }
                    })
                    .subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
    • scan: 与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。

          Observable.just(2,3,5)
                    .scan(new Func2() {
                        @Override
                        public Integer call(Integer sum, Integer item) {
                            return sum+item;
                        }
                    })
            .subscribe(integer -> Log.d("JG",integer.toString())) //2,5,10
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • groupBy: 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。

           Observable.just(2,3,5,6)
                    .groupBy(new Func1() {
                        @Override
                        public String call(Integer integer) {//分组
                            return integer%2==0?"偶数":"奇数";
                        }
                    })
            .subscribe(new Action1>() {
                @Override
                public void call(GroupedObservable o) {
      
                    o.subscribe(new Action1() {
                        @Override
                        public void call(Integer integer) {
                            Log.d("JG",o.getKey()+":"+integer.toString()); //偶数:2,奇数:3,...
                        }
                    });
                }
            })
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • buffer: 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

            Observable.just(2,3,5,6)
                    .buffer(3)
                    .subscribe(new Action1>() {
                        @Override
                        public void call(List integers) {
      
                        }
                    })
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • window: 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。

           Observable.just(2,3,5,6)
                    .window(3)
                    .subscribe(new Action1>() {
                        @Override
                        public void call(Observable integerObservable) {
                            integerObservable.subscribe(new Action1() {
                                @Override
                                public void call(Integer integer) {
      
                                }
                            });
                        }
                    })
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13

    错误处理/重试机制

    • onErrorResumeNext: 当原始Observable在遇到错误时,使用备用Observable。。

          Observable.just(1,"2",3)
            .cast(Integer.class)
            .onErrorResumeNext(Observable.just(1,2,3))
            .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
            ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • onExceptionResumeNext: 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常。

    • onErrorReturn: 当原始Observable在遇到错误时发射一个特定的数据。

         Observable.just(1,"2",3)
                    .cast(Integer.class)
                    .onErrorReturn(new Func1() {
                        @Override
                        public Integer call(Throwable throwable) {
                            return 4;
                        }
                    }).subscribe(new Action1() {
                @Override
                public void call(Integer integer) {
                    Log.d("JG",integer.toString());1,4
                }
            });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • retry: 当原始Observable在遇到错误时进行重试。

            Observable.just(1,"2",3)
            .cast(Integer.class)
            .retry(3)
            .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
            ;//1,1,1,1,onError
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • retryWhen: 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry

          Observable.just(1,"2",3)
            .cast(Integer.class)
            .retryWhen(new Func1, Observable>() {
                @Override
                public Observable call(Observable observable) {
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            })
            .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
            //1,1
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

    连接操作

    ConnectableObservable与普通的Observable差不多,但是可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。
    ConnectableObservable.connect()指示一个可连接的Observable开始发射数据.
    Observable.publish()将一个Observable转换为一个可连接的Observable
    Observable.replay()确保所有的订阅者看到相同的数据序列的ConnectableObservable,即使它们在Observable开始发射数据之后才订阅。
    ConnectableObservable.refCount()让一个可连接的Observable表现得像一个普通的Observable。

           ConnectableObservable co= Observable.just(1,2,3)
                    .publish();
    
            co .subscribe(integer -> Log.d("JG",integer.toString()) );
            co.connect();//此时开始发射数据
    
    • 1
    • 2
    • 3
    • 4
    • 5

    阻塞操作

    BlockingObservable是一个阻塞的Observable。普通的Observable 转换为 BlockingObservable,可以使用Observable.toBlocking(�6�5)方法或者BlockingObservable.from(�6�5)方法。内部通过CountDownLatch实现了阻塞操作。。

    以下的操作符可以用于BlockingObservable,如果是普通的Observable,务必使用Observable.toBlocking()转为阻塞Observable后使用,否则达不到预期的效果。

    • forEach: 对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。

        Observable.just(2,3).observeOn(Schedulers.newThread()).toBlocking()
                  .forEach(integer -> {
                      Log.d("JG",integer.toString()+" "+Thread.currentThread().getName());
                      Utils.sleep(500);
                  });
      
        Log.d("JG",Thread.currentThread().getName());
            // 2 RxNewThreadScheduler-1
            // 3 RxNewThreadScheduler-1
            // main
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    • first/firstOrDefault/last/lastOrDefault:这几个操作符之前有介绍过。也可以用于阻塞操作。

    • single/singleOrDefault:如果Observable终止时只发射了一个值,返回那个值,否则抛出异常或者发射默认值。

    • mostRecent:返回一个总是返回Observable最近发射的数据的Iterable。

    • next: 返回一个Iterable,会阻塞直到Observable发射了第二个值,然后返回那个值。

    • latest: 返回一个iterable,会阻塞直到或者除非Observable发射了一个iterable没有返回的值,然后返回这个值

    • toFuture: 将Observable转换为一个Future

    • toIterable:将一个发射数据序列的Observable转换为一个Iterable。

    • getIterator:将一个发射数据序列的Observable转换为一个Iterator

    工具集

    • materialize: 将Observable转换成一个通知列表。

         Observable.just(1,2,3)
                   .materialize()
                   .subscribe(new Action1>() {
                       @Override
                       public void call(Notification notification) {
                           Log.d("JG",notification.getKind()+" "+notification.getValue());
                           //OnNext 1
                           //OnNext 2
                           //OnNext 3
                           //OnCompleted null
                       }
                   });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • dematerialize: 与上面的作用相反,将通知逆转回一个Observable。

    • timestamp: 给Observable发射的每个数据项添加一个时间戳。

          Observable.just(1,2,3)
                   .timestamp()
                   .subscribe(new Action1>() {
                       @Override
                       public void call(Timestamped timestamped) {
                           Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue());
                           //1472627510548 1
                           //1472627510549 2
                           //1472627510549 3
                       }
                   });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • timeInterval:给Observable发射的两个数据项间添加一个时间差,实现在OperatorTimeInterval

    timeInterval

    • serialize: 强制Observable按次序发射数据并且要求功能是完好的

    • cache: 缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者

    • observeOn: 指定观察者观察Observable的调度器

    • subscribeOn: 指定Observable执行任务的调度器

    • doOnEach: 注册一个动作,对Observable发射的每个数据项使用

          Observable.just(2,3)
                    .doOnEach(new Action1>() {
                        @Override
                        public void call(Notification notification) {
                            Log.d("JG","--doOnEach--"+notification.toString());
                        }
                    })
                    .subscribe(integer -> Log.d("JG",integer.toString()));
        //结果为:            
         // --doOnEach--[rx.Notification@133c40b0 OnNext 2]
        // 2
         // --doOnEach--[rx.Notification@133c40b0 OnNext 3]
        // 3
       // --doOnEach--[rx.Notification@df4db0e OnCompleted]
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    • doOnCompleted: 注册一个动作,对正常完成的Observable使用

    • doOnError: 注册一个动作,对发生错误的Observable使用

    • doOnTerminate:注册一个动作,对完成的Observable使用,无论是否发生错误

          Observable.just(2,3)
                    .doOnTerminate(new Action0() {
                        @Override
                        public void call() {
                            Log.d("JG","--doOnTerminate--");
                        }
                    })
                    .subscribe(integer -> Log.d("JG",integer.toString()));
        // 2 , 3 , --doOnTerminate--
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • doOnSubscribe: 注册一个动作,在观察者订阅时使用。内部由OperatorDoOnSubscribe实现

    doOnSubscribe

    • doOnUnsubscribe: 注册一个动作,在观察者取消订阅时使用。内部由OperatorDoOnUnsubscribe实现,在call中加入一个解绑动作。

    doOnUnsubscribe

    • finallyDo/doAfterTerminate: 注册一个动作,在Observable完成时使用

        Observable.just(2,3)
                    .doAfterTerminate(new Action0() {
                        @Override
                        public void call() {
                            Log.d("JG","--doAfterTerminate--");
                        }
                    })
                    .subscribe(integer -> Log.d("JG",integer.toString()));
        //2,3,  --doAfterTerminate--
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • delay: 延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)。

    • delaySubscription: 延时处理订阅请求。实现在OnSubscribeDelaySubscription

    delaySubscription

    • using: 创建一个只在Observable生命周期存在的资源,当Observable终止时这个资源会被自动释放。

          Observable.using(new Func0() {//资源工厂
                @Override
                public File call() {
      
                    File file = new File(getCacheDir(), "a.txt");
                    if(!file.exists()){
                        try {
                            Log.d("JG","--create--");
                            file.createNewFile();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    return file;
                }
            }, new Func1>() { //Observable
                @Override
                public Observable call(File file) {
                    return Observable.just(file.exists() ? "exist" : "no exist");
                }
            }, new Action1() {//释放资源动作
                @Override
                public void call(File file) {
                    if(file!=null&&file.exists()){
                        Log.d("JG","--delete--");
                        file.delete();
                    }
                }
            })
            .subscribe(s -> Log.d("JG",s))
            ;
         //--create--
         //exist
         //--delete--
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
    • single/singleOrDefault: 强制返回单个数据,否则抛出异常或默认数据。

  • 相关阅读:
    如何给图片降噪?看完你就学会了
    【云原生】基于Kubernetes开发的阿里云ACK之可观测监控
    二、GoLang输出HelloWorld、基本数据类型、变量常量定义、基本类型转换
    一篇文章带你搞懂MybatisPlus
    设置线程分离的方法
    【Putty】win10 / win 11:SSH 远程连接工具 Putty 下载、安装
    进制转换
    SpringBoot的测试方案
    Mac PF命令防火墙
    Android kotlin实现读取pdf和pptx文件
  • 原文地址:https://blog.csdn.net/VX_WJ88950106/article/details/126330706