• RxJava(一)创建操作符


    1.create()

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
    4. emitter.onNext("Hello RxJava!");
    5. emitter.onComplete();
    6. }
    7. }).subscribe(new Observer() {
    8. @Override
    9. public void onSubscribe(@NonNull Disposable d) {
    10. }
    11. @Override
    12. public void onNext(@NonNull String str) {
    13. Log.e("rxjava", "onNext " + str);
    14. }
    15. @Override
    16. public void onError(@NonNull Throwable e) {
    17. e.printStackTrace();
    18. }
    19. @Override
    20. public void onComplete() {
    21. Log.e("rxjava", "onComplete");
    22. }
    23. });
    1. 执行结果:
    2. rxjava: onNext Hello RxJava!
    3. rxjava: onComplete

    执行原理分析:

    调用 emitter.onNext("Hello RxJava!")后,就会调用Observer的onNext(String str)。
      是如何做到的呢?拆解一下下面的代码。
      Observable.create()接收一个ObservableOnSubscribe,这是个接口,得实现subscribe()方法,
                          这个方法有一个参数ObservableEmitter emitter
       Observable.create() 的返回值是ObservableCreate类型,subscribe(new Observer())就是设置给它的。
                           Observer 是一个接口,有四个需要实现的方法。
      那么是如何实现,调用emitter.onNext("Hello RxJava!") 就会回调Observer的onNext(String str)方法的呢?
      在创建new ObservableCreate(source)时,把接收一个ObservableOnSubscribe作为参数传递进来了,而在subscribe时,又把observer传递进来了。
      看看ObservableCreate内部,是如何把这两者关联起来的。
      在调用subscribe时,会调用ObservableCreate的subscribeActual方法。
     

    1. protected void subscribeActual(Observer<? super T> observer) {
    2. //创建了一个发射器,并将observer作为参数传递进去了。这样两者就产生了关联
    3. CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    4. observer.onSubscribe(parent);
    5. try {
    6. //ObservableOnSubscribe<T> source,这个source就是Observable.create()的参数
    7. //给ObservableOnSubscribe设置了一个发射器。
    8. source.subscribe(parent);
    9. } catch (Throwable ex) {
    10. Exceptions.throwIfFatal(ex);
    11. parent.onError(ex);
    12. }
    13. }

    再看CreateEmitter内部实现。在CreateEmitter的onNext方法调用的observer的onNext方法。
    这样就实现了外面看到现象。

    1. public void onNext(T t) {
    2. if (!isDisposed()) {
    3. observer.onNext(t);
    4. }
    5. }

    2.just()

    1. Observable.just("Hello","RxJava").subscribe(new Observer() {
    2. @Override
    3. public void onSubscribe(@NonNull Disposable d) {
    4. }
    5. @Override
    6. public void onNext(@NonNull String s) {
    7. Log.e("rxjava","onNext "+s);
    8. }
    9. @Override
    10. public void onError(@NonNull Throwable e) {
    11. }
    12. @Override
    13. public void onComplete() {
    14. Log.e("rxjava","onComplete ");
    15. }
    16. });
    17. 执行结果:
    18. rxjava: onNext Hello
    19. rxjava: onNext RxJava
    20. rxjava: onComplete

    just可以接收1-10个参数,有几个参数,就会调用机会onNext。执行完毕后,默认调用onComplete
    会把这些个参数封装成一个可变参数的数组items,传递给ObservableFromArray,
    Observable.just返回的是 ObservableFromArray(items)。
    在ObservableFromArray#subscribeActual方法中,会创建FromArrayDisposable(observer, array)
    在FromArrayDisposable内部会遍历这个数组,依次调用observer的onNext方法。

    3.fromIterable()

    1. List list = new ArrayList();
    2. list.add("Hello");
    3. list.add("RxJava");
    4. Observable.fromIterable(list).subscribe(new Observer<String>() {
    5. @Override
    6. public void onSubscribe(@NonNull Disposable d) {
    7. }
    8. @Override
    9. public void onNext(@NonNull String o) {
    10. Log.e("rxjava",o);
    11. }
    12. @Override
    13. public void onError(@NonNull Throwable e) {
    14. }
    15. @Override
    16. public void onComplete() {
    17. }
    18. });
    19. 执行结果:
    20. rxjava: Hello
    21. rxjava: RxJava
    22. rxjava: onComplete

    Observable.fromIterable(list)和just类似,fromIterable接收一个集合。
    fromIterable 返回的是ObservableFromIterable(source)。后面的调用逻辑和上面的类似

    4.fromArray()

    1. Observable.fromArray(1,2,3,4,5).subscribe(new Observer() {
    2. @Override
    3. public void onSubscribe(@NonNull Disposable d) {
    4. }
    5. @Override
    6. public void onNext(@NonNull Integer integer) {
    7. Log.e("rxjava","integer "+integer);
    8. }
    9. @Override
    10. public void onError(@NonNull Throwable e) {
    11. }
    12. @Override
    13. public void onComplete() {
    14. Log.e("rxjava","onComplete");
    15. }
    16. });
    17. 执行结果:
    18. rxjava: integer 1
    19. rxjava: integer 2
    20. rxjava: integer 3
    21. rxjava: integer 4
    22. rxjava: integer 5
    23. rxjava: onComplete

    Observable.fromArray接收一个泛型数组。返回的是ObservableFromArray(items) 和Just一样。
    5.rang 范围操作符

    1. Observable.range(1,3).subscribe(new Observer() {
    2. @Override
    3. public void onSubscribe(@NonNull Disposable d) {
    4. }
    5. @Override
    6. public void onNext(@NonNull Integer integer) {
    7. Log.e("rxjava","integer"+integer);
    8. }
    9. @Override
    10. public void onError(@NonNull Throwable e) {
    11. }
    12. @Override
    13. public void onComplete() {
    14. Log.e("rxjava","onComplete");
    15. }
    16. });
    17. 执行结果:
    18. rxjava: integer1
    19. rxjava: integer2
    20. rxjava: integer3
    21. rxjava: onComplete

    进行一个for循环,来调用Observer的onNext方法,执行结束后会调用Observer的onComplete方法
    6.timer:计时器

    1. Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
    2. Observable.timer(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
    3. @Override
    4. public void accept(Long aLong) throws Exception {
    5. Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
    6. Log.e("rxjava",aLong.toString());
    7. }
    8. });
    9. 执行结果:
    10. rxjava: currentThreadName main
    11. rxjava: currentThreadName RxComputationThreadPool-1
    12. rxjava: 0

    Observable.timer(3, TimeUnit.SECONDS),接收两个参数,时间长度和时间单位。多长时间之后,执行accept方法。
    Observable.timer返回ObservableTimer。在ObservableTimer中通过调度器创建一个线程。
    从执行结果看timer运行在一个在这个线程中,可以通过第三个参数指定调度器Scheduler。
    比如AndroidSchedulers.mainThread()或 Schedulers.io()

    1. Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
    2. Observable.timer(3,TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(new Consumer() {
    3. @Override
    4. public void accept(Long aLong) throws Exception {
    5. Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
    6. Log.e("rxjava",aLong.toString());
    7. }
    8. });
    9. 执行结果:
    10. rxjava: currentThreadName main
    11. rxjava: currentThreadName main
    12. rxjava: 0

    通过执行结果可以看出 AndroidSchedulers.mainThread()是指定timer运行在主线程中。
    7.interval 时间间隔

    1. Observable.interval(2,TimeUnit.SECONDS).subscribe(new Consumer() {
    2. @Override
    3. public void accept(Long aLong) throws Exception {
    4. Log.e("rxjava",aLong.toString());
    5. }
    6. });

     Observable.interval 接收俩参数,第一个时间间隔,第二个时间单位。
     interval操作符,会根据设置的时间间隔,不断的执行accept(Long aLong)方法

  • 相关阅读:
    局域网监控软件如何防止数据泄密
    前端js手写面试题汇总(二)
    C51--单片机中断
    【无标题】
    jira+confluence安装
    含文档+PPT+源码等]精品基于Uniapp实现的鲜花商城App[包运行成功]
    基于MATLAB的GPS卫星绕地运行轨迹动态模拟仿真
    【Linux服务器架设】存储服务器构建原理-NFS
    Word第一课
    Http状态码
  • 原文地址:https://blog.csdn.net/niuyongzhi/article/details/126924632