1.create()
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(@NonNull ObservableEmitter
emitter) throws Exception { - emitter.onNext("Hello RxJava!");
- emitter.onComplete();
- }
- }).subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull String str) {
- Log.e("rxjava", "onNext " + str);
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
- e.printStackTrace();
- }
-
- @Override
- public void onComplete() {
- Log.e("rxjava", "onComplete");
- }
- });
- 执行结果:
- rxjava: onNext Hello RxJava!
- rxjava: onComplete
执行原理分析:
调用 emitter.onNext("Hello RxJava!")后,就会调用Observer的onNext(String str)。
是如何做到的呢?拆解一下下面的代码。
Observable.create()接收一个ObservableOnSubscribe,这是个接口,得实现subscribe()方法,
这个方法有一个参数ObservableEmitter
Observable.create() 的返回值是ObservableCreate类型,subscribe(new Observer
Observer 是一个接口,有四个需要实现的方法。
那么是如何实现,调用emitter.onNext("Hello RxJava!") 就会回调Observer的onNext(String str)方法的呢?
在创建new ObservableCreate
看看ObservableCreate内部,是如何把这两者关联起来的。
在调用subscribe时,会调用ObservableCreate的subscribeActual方法。
-
- protected void subscribeActual(Observer<? super T> observer) {
- //创建了一个发射器,并将observer作为参数传递进去了。这样两者就产生了关联
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- observer.onSubscribe(parent);
- try {
- //ObservableOnSubscribe<T> source,这个source就是Observable.create()的参数
- //给ObservableOnSubscribe设置了一个发射器。
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
再看CreateEmitter内部实现。在CreateEmitter的onNext方法调用的observer的onNext方法。
这样就实现了外面看到现象。
- public void onNext(T t) {
- if (!isDisposed()) {
- observer.onNext(t);
- }
- }
2.just()
- Observable.just("Hello","RxJava").subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull String s) {
- Log.e("rxjava","onNext "+s);
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
- Log.e("rxjava","onComplete ");
- }
- });
-
- 执行结果:
- rxjava: onNext Hello
- rxjava: onNext RxJava
- rxjava: onComplete
just可以接收1-10个参数,有几个参数,就会调用机会onNext。执行完毕后,默认调用onComplete
会把这些个参数封装成一个可变参数的数组items,传递给ObservableFromArray,
Observable.just返回的是 ObservableFromArray
在ObservableFromArray#subscribeActual方法中,会创建FromArrayDisposable
在FromArrayDisposable内部会遍历这个数组,依次调用observer的onNext方法。
3.fromIterable()
- List list = new ArrayList();
- list.add("Hello");
- list.add("RxJava");
- Observable.fromIterable(list).subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull String o) {
- Log.e("rxjava",o);
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
- 执行结果:
- rxjava: Hello
- rxjava: RxJava
- rxjava: onComplete
Observable.fromIterable(list)和just类似,fromIterable接收一个集合。
fromIterable 返回的是ObservableFromIterable
4.fromArray()
- Observable.fromArray(1,2,3,4,5).subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull Integer integer) {
- Log.e("rxjava","integer "+integer);
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
- Log.e("rxjava","onComplete");
- }
- });
- 执行结果:
- rxjava: integer 1
- rxjava: integer 2
- rxjava: integer 3
- rxjava: integer 4
- rxjava: integer 5
- rxjava: onComplete
Observable.fromArray接收一个泛型数组。返回的是ObservableFromArray
5.rang 范围操作符
- Observable.range(1,3).subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull Integer integer) {
- Log.e("rxjava","integer"+integer);
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
- Log.e("rxjava","onComplete");
- }
- });
- 执行结果:
- rxjava: integer1
- rxjava: integer2
- rxjava: integer3
- rxjava: onComplete
进行一个for循环,来调用Observer的onNext方法,执行结束后会调用Observer的onComplete方法
6.timer:计时器
- Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
- Observable.timer(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
- @Override
- public void accept(Long aLong) throws Exception {
- Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
- Log.e("rxjava",aLong.toString());
- }
- });
- 执行结果:
- rxjava: currentThreadName main
- rxjava: currentThreadName RxComputationThreadPool-1
- rxjava: 0
Observable.timer(3, TimeUnit.SECONDS),接收两个参数,时间长度和时间单位。多长时间之后,执行accept方法。
Observable.timer返回ObservableTimer。在ObservableTimer中通过调度器创建一个线程。
从执行结果看timer运行在一个在这个线程中,可以通过第三个参数指定调度器Scheduler。
比如AndroidSchedulers.mainThread()或 Schedulers.io()
- Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
- Observable.timer(3,TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(new Consumer
() { - @Override
- public void accept(Long aLong) throws Exception {
- Log.e("rxjava","currentThreadName "+Thread.currentThread().getName());
- Log.e("rxjava",aLong.toString());
- }
- });
- 执行结果:
- rxjava: currentThreadName main
- rxjava: currentThreadName main
- rxjava: 0
通过执行结果可以看出 AndroidSchedulers.mainThread()是指定timer运行在主线程中。
7.interval 时间间隔
- Observable.interval(2,TimeUnit.SECONDS).subscribe(new Consumer
() { - @Override
- public void accept(Long aLong) throws Exception {
- Log.e("rxjava",aLong.toString());
- }
- });
Observable.interval 接收俩参数,第一个时间间隔,第二个时间单位。
interval操作符,会根据设置的时间间隔,不断的执行accept(Long aLong)方法