• Vert.x 源码解析(4.x)(一)——Future源码解析


    目录

    在这里插入图片描述

    1. 简介

    在现代的软件开发中,异步编程已经变得非常重要。它可以提高应用程序的并发性能,使应用程序能够更有效地处理大量的并行操作。Vert.x 是一个面向事件驱动、非阻塞的异步编程框架,它提供了丰富的工具来简化异步编程的复杂性。

    如下图就是Vert.x实现异步设计到的类,主要关键在于FutureImpl以及PromiseImpl。下面会介绍他们分别负责什么。
    在这里插入图片描述

    2.关键类简介

    2.1 AsyncResult

    这是 Vert.x 中的一个通用接口,用于表示异步操作的结果。它可以包含成功的结果值或失败的异常,允许您在异步操作完成后检查结果,并相应地采取行动。

    关键函数如下

    public interface AsyncResult<T> {
    
      //执行完后的值
      T result();
    
      //异常值
      Throwable cause();
      //判断是否成功
      boolean succeeded();
      //判断是否失败
      boolean failed();
        
      .........
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2.2 Future

    它扩展了AsyncResult接口,并且内部增加了很多组合操作符,比如join,any,all,map等

    关键函数如下

    public interface Future<T> extends AsyncResult<T> {
      static <T> CompositeFuture all(List<? extends Future<?>> futures) {
        return CompositeFutureImpl.all(futures.toArray(new Future[0]));
      }
      static CompositeFuture any(List<? extends Future<?>> futures) {
        return CompositeFutureImpl.any(futures.toArray(new Future[0]));
      }
      static CompositeFuture join(List<? extends Future<?>> futures) {
        return CompositeFutureImpl.join(futures.toArray(new Future[0]));
      }
      default <U> Future<U> flatMap(Function<T, Future<U>> mapper) {
        return compose(mapper);
      }
      //是否完成
      boolean isComplete();
      //完成后的回调监听
      Future<T> onComplete(Handler<AsyncResult<T>> handler);
      //成功监听
      default Future<T> onSuccess(Handler<T> handler) {
        return onComplete(ar -> {
          if (ar.succeeded()) {
            handler.handle(ar.result());
          }
        });
      }
      //失败监听
      default Future<T> onFailure(Handler<Throwable> handler) {
        return onComplete(ar -> {
          if (ar.failed()) {
            handler.handle(ar.cause());
          }
        });
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2.3 FutureInternal

    这个接口主要定义了添加监听器的方法,后续所以监听完成的监听器都是调用这个方法,但是这个是内部调用的。

    //上下文,Vert.x线程等都是由这个来执行的,具体后续会出context文章
    ContextInternal context();
    //添加监听器
    void addListener(Listener<T> listener);
    
    • 1
    • 2
    • 3
    • 4

    2.4 FutureImpl、FutureBase

    FutureBase这里就emitSuccess和emitFailure方法主要是是执行listener的方法以及一些转换函数添加监听器的方法,这边就不单独列出了

    FutureImpl

    这里就主要介绍两个方法

    onComplete以及tryComplete

    onComplete:传入一个handler,后续任务完成后会调用handler

    tryComplete:当你要操作的异步的有结果后会调用tryComplete,接着就会调用OnComplete传入的handler方法(其实实际上暴露出来给我们使用者调用的是PromiseImpl的complete方法,接着再调用tryComplete,这个后面会讲

     //这是结果值
     private Object value;
    /**
     * 尝试完成,就是把结果传入进来,接着调用监听器,告知函所有监听者完成信号
     * @param result
     * @return
     */
    public boolean tryComplete(T result) {
      Listener<T> l;
      synchronized (this) {
        //如果value有值了直接返回,主要是把传进来的result赋值。
        if (value != null) {
          return false;
        }
        
        value = result == null ? NULL_VALUE : result;
        l = listener;
        listener = null;
      }
      if (l != null) {
        //这就是我们刚刚说的FutureBase定义的接口,里面就是调用监听器的方法
        emitSuccess(result, l);
      }
      return true;
    }
    
    /**
       * 添加完成回调的方法
       * @param handler the handler that will be called with the result
       * @return
       */
      @Override
      public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
        Objects.requireNonNull(handler, "No null handler accepted");
        Listener<T> listener;
        //判断是否属于Listener,如果不是则把handler包裹进来,通过listener成功和失败的回调进行执行handler
        if (handler instanceof Listener) {
          listener = (Listener<T>) handler;
        } else {
          listener = new Listener<T>() {
            @Override
            public void onSuccess(T value) {
              try {
                  //成功后调用handle
                handler.handle(FutureImpl.this);
              } catch (Throwable t) {
                if (context != null) {
                  context.reportException(t);
                } else {
                  throw t;
                }
              }
            }
            @Override
            public void onFailure(Throwable failure) {
              try {
                  //失败后调用
                handler.handle(FutureImpl.this);
              } catch (Throwable t) {
                if (context != null) {
                  context.reportException(t);
                } else {
                  throw t;
                }
              }
            }
          };
        }
        //就是调用FutureInternal的添加监听器的方法。
        addListener(listener);
        return this;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    2.5 Promise、PromiseInternal、PromiseImpl

    Promise

    public interface Promise<T> extends Handler<AsyncResult<T>> {
      //提供直接返回实现类实例
      static <T> Promise<T> promise() {
        return new PromiseImpl<>();
      }
      //这里页定义了tryComplete
      boolean tryComplete(T result);   
      //看到了吧,这个我们使用者其实是调用这个complete方法,它内部会调用tryComplete方法来
      default void complete(T result) {
        if (!tryComplete(result)) {
          throw new IllegalStateException("Result is already complete");
        }
      }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    PromiseInternal

    内部增加了一个context获取方法

    ContextInternal context();
    
    • 1

    PromiseImpl

    看以下代码PromiseImpl继承了FutureImpl

    public final class PromiseImpl<T> extends FutureImpl<T> implements PromiseInternal<T>, Listener<T> {
      @Override
      public Future<T> future() {
        return this;
      }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.6 疑问

    是不是疑问来了

    1. 为什么还需要Promise来调用complete,不直接用FutureImpl呢?

    职责分明问题FutureImpl是结果容器,代表未来会生成结果的容器,接着通过监听器来告知你这个结果。

    PromiseImpl 是可以允许你手动设置异步操作结果所以Future像Get,Promise是Set

    2.7 其他

    CompositeFutureImpl:继承FutureImpl,用于实现 CompositeFuture 接口的默认实现。CompositeFuture 用于组合多个异步操作,等待它们全部完成或任意一个完成(Future做组合变换等操作都是继承FutureImpl类实现,包括PromiseImpl,Otherwise,Mapping等

    3. 入门实例

    3.1 案例1(独立使用Future)

        //创建Promise
        Promise promise=new PromiseImpl();
        //根据Promise获取Future
        Future future=promise.future();
    
        //模拟异步任务
        new Thread(() -> {
          try {
            Thread.sleep(5000);
            //执行完后调用complete方法,并传入结果
            promise.complete(123);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }).start();
        //回调
        future.onComplete(event -> {
          System.out.println(event+"成功了");
        });
        System.out.println("代码执行完成");
    
        Thread.sleep(15000);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    结果:

    代码执行完成
    Future{result=123}成功了
    
    • 1
    • 2

    3.2 案例2(Vert.x内部封装的文件调用)

    这是Vert.x内部打开文件的方法,内部也是使用了Future和Promise。后面会分析源代码如何实现

    FileSystem fs = vertx.fileSystem();
    
    Future<FileProps> future = fs.props("/my_file.txt");
    
    future.onComplete((AsyncResult<FileProps> ar) -> {
      if (ar.succeeded()) {
        FileProps props = ar.result();
        System.out.println("File size = " + props.size());
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.源码分析

    根据入门实例,进行源码分析

    前面是直接创建PromiseImpl,接着调用promise.future直接获取Future类了。

    我们重点分析的就是OnComplete类和complete类。

    //创建Promise
    Promise promise=new PromiseImpl();
    //根据Promise获取Future
    Future future=promise.future();
    
    //模拟异步任务
    new Thread(() -> {
      try {
        Thread.sleep(5000);
        //执行完后调用complete方法
        promise.complete(123);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();
    //回调
    future.onComplete(event -> {
      System.out.println(event+"成功了");
    });
    System.out.println("代码执行完成");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4.1 OnComplete

    public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
      Objects.requireNonNull(handler, "No null handler accepted");
      Listener<T> listener;
      //判断是否属于Listener,如果不是则把handler包裹进来,通过listener成功和失败的回调进行执行handler
      if (handler instanceof Listener) {
        listener = (Listener<T>) handler;
      } else {
        listener = new Listener<T>() {
          @Override
          public void onSuccess(T value) {
            try {
              handler.handle(FutureImpl.this);
            } catch (Throwable t) {
              if (context != null) {
                context.reportException(t);
              } else {
                throw t;
              }
            }
          }
          @Override
          public void onFailure(Throwable failure) {
            try {
              handler.handle(FutureImpl.this);
            } catch (Throwable t) {
              if (context != null) {
                context.reportException(t);
              } else {
                throw t;
              }
            }
          }
        };
      }
      //就是调用FutureInternal的添加监听器的方法。
      addListener(listener);
      return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    addListener

    @Override
    public void addListener(Listener<T> listener) {
      Object v;
      synchronized (this) {
        //将value赋值,这就是判断当前Future是否已存在结果值
        v = value;
        //如果等于null说明现在还没有结果值
        //因为可能添加监听器的时候已经有值了,那么直接调用监听器方法
        if (v == null) {
          //如果等于空就赋值
          if (this.listener == null) {
            this.listener = listener;
          } else {
            ListenerArray<T> listeners;
            //根据类型是单一还是列表要么转换,要么new再添加
            if (this.listener instanceof FutureImpl.ListenerArray) {
              listeners = (ListenerArray<T>) this.listener;
            } else {
              listeners = new ListenerArray<>();
              listeners.add(this.listener);
              this.listener = listeners;
            }
            listeners.add(listener);
          }
          return;
        }
      }
      if (v instanceof CauseHolder) {
        emitFailure(((CauseHolder)v).cause, listener);
      } else {
        if (v == NULL_VALUE) {
          v = null;
        }
        emitSuccess((T) v, listener);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    emitSuccess 这个方法再complete会说

    4.2 complete

    promise.complete(123);
    
    • 1

    complete里面默认就是调用tryComplete并传入值

    default void complete(T result) {
      if (!tryComplete(result)) {
        throw new IllegalStateException("Result is already complete");
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    /**
     * 尝试完成,就是把结果传入进来,接着调用监听器,告知函所有监听者完成信号
     * @param result
     * @return
     */
    public boolean tryComplete(T result) {
      Listener<T> l;
      synchronized (this) {
        //如果value有值了直接返回,主要是把传进来的result赋值。
        if (value != null) {
          return false;
        }
    
        value = result == null ? NULL_VALUE : result;
        l = listener;
        listener = null;
      }
      if (l != null) {
        //这就是我们刚刚说的FutureBase定义的接口,里面就是调用监听器的方法
        emitSuccess(result, l);
      }
      return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    emitSuccess

    protected final void emitSuccess(T value, Listener<T> listener) {
      //context不等于空,则再context里面的线程进行执行,调用listener.onSuccess
      if (context != null && !context.isRunningOnContext()) {
        context.execute(() -> {
          ContextInternal prev = context.beginDispatch();
          try {
            listener.onSuccess(value);
          } finally {
            context.endDispatch(prev);
          }
        });
      } else {
        //这边直接执行
        listener.onSuccess(value);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    5 总结

    创建PromiseImpl,并且获取Future类,通过Future.OnComplete来添加监听器,通过Promise的complete设置值并且通知监听器。是不是很简单。

    后续我看情况再写一篇关于针对于Future的其他实现类,来解释all,any,map等原理

  • 相关阅读:
    E. Iva & Pav -前缀和 + 二分 +位运算
    SQLAlchemy学习-10. validates()校验器
    maven本地仓库存在jar导包时依然试图远程仓库下载问题解决
    抖音小店无货源,正处于红利期内的电商项目,新手能操作吗?
    都说了能不动就别动,非要去调整,出生产事故了吧
    【【verilog代码异步FIFO的设计解释+源码+tb】】
    nginx 配置 ssl
    Elasticsearch7.7的安装与启动
    图像识别与处理学习笔记(四)贝叶斯决策和概率密度估计
    springboot整合Rocketmq
  • 原文地址:https://blog.csdn.net/chen_changtui/article/details/132615231