• JDK19虚拟线程初探(三)


    简介

    上两篇文章中,我们已经介绍了使用虚拟线程的例程和VirtualThread。接下来,我们继续介绍虚拟线程的调度,即VirtualThread中最重要的两个成员变量Executor scheduler和Continuation cont。

    scheduler

    创建

    scheduler是用于虚拟线程调度的线程池,先看虚拟线程的初始化代码:

        VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
            super(name, characteristics, /*bound*/ false);
            Objects.requireNonNull(task);
    
            // 如果入参的scheduler为null,则做默认逻辑
            if (scheduler == null) {
                Thread parent = Thread.currentThread();
                if (parent instanceof VirtualThread vparent) {
                	// 如果当前线程是虚拟线程,则使用当前线程的scheduler
                    scheduler = vparent.scheduler;
                } else {
                    // 否则使用全局线程池DEFAULT_SCHEDULER
                    scheduler = DEFAULT_SCHEDULER;
                }
            }
    
            this.scheduler = scheduler;
            // 初始化Continuation 
            this.cont = new VThreadContinuation(this, task);
            this.runContinuation = this::runContinuation;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    默认调度器DEFAULT_SCHEDULER其实就是个ForkJoinPool,创建参数如下:

    • parallelism,可同时执行的线程个数,取值jdk.virtualThreadScheduler.parallelism,默认为Runtime.getRuntime().availableProcessors()。
    • asyncMode,异步工作模式,设置为true,表示FIFO。
    • corePoolSize,池中保持的线程数,设置为0
    • maximumPoolSize,池中最多保持的线程数,取值jdk.virtualThreadScheduler.maxPoolSize
    • minRunnable,正在运行的最小线程数,取值jdk.virtualThreadScheduler.minRunnable
    • keepAliveTime,空闲线程保活时间30秒

    提交任务

    虚拟线程start()和unpark()时,会调用submitRunContinuation()方法提交任务,代码如下:

            try {
            	// 调度器为ForkJoinPool的实现且lazySubmit=true时,调用ForkJoinPool的lazySubmit方法提交任务
            	// lazySubmit=true,仅在afterYield中调用
                if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
                    pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
                } else {
                	// 否则走常规的execute提交任务
                    scheduler.execute(runContinuation);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • ForkJoinPool的lazySubmit是JDK19的新增方法,机制类似于Jetty的EatWhatYouKill,尽量让调用线程执行提交的任务。

    Continuation

    先看看continuation的成员变量:

    // mounted的变量句柄,变量句柄是JDK9引入的新特性,优化了对于变量的并发访问
    private static final VarHandle MOUNTED;
    // 挂载还是卸载
    private volatile boolean mounted = false;
    
    // 需要执行的任务
    private final Runnable target;
    // 范围,虚拟线程中的ContinuationScope默认为VirtualThreads
    private final ContinuationScope scope;
    // 父节点,null表示native线程栈
    private Continuation parent;
    // 子节点,not null表示子节点中进行了yield
    private Continuation child;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    初始化

    	// ContinuationScope,虚拟线程中的ContinuationScope默认为VirtualThreads
    	// 真正需要执行的task
        public Continuation(ContinuationScope scope, Runnable target) {
            this.scope = scope;
            this.target = target;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    run()

    虚拟线程runContinuation会调用到Continuation的run方法,实际执行任务,代码如下:

        public final void run() {
            while (true) {
            	// 通过变量句柄修改mounted状态
            	// 挂载
                mount();
                JLA.setExtentLocalCache(extentLocalCache);
    
                if (done)
                    throw new IllegalStateException("Continuation terminated");
    
    			// 获取平台线程
                Thread t = currentCarrierThread();
                // 修改父节点
                if (parent != null) {
                    if (parent != JLA.getContinuation(t))
                        throw new IllegalStateException();
                } else
                    this.parent = JLA.getContinuation(t);
                JLA.setContinuation(t, this);
    
                try {
                	// 判断是否是虚拟线程
                    boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope());
                    if (!isStarted()) {
                    	// 第一次执行
                    	// false:继续执行为false
                        enterSpecial(this, false, isVirtualThread);
                    } else {
                        assert !isEmpty();
                        // true:继续执行
                        enterSpecial(this, true, isVirtualThread);
                    }
                } finally {
                	// 内存屏障
                    fence();
                    try {
                        assert isEmpty() == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this));
                        // 平台线程指向父节点
                        JLA.setContinuation(currentCarrierThread(), this.parent);
                        if (parent != null)
                            parent.child = null;
    
    					// 清理栈帧
                        postYieldCleanup();
    					// 从平台线程卸载
                        unmount();
                        if (PRESERVE_EXTENT_LOCAL_CACHE) {
                            extentLocalCache = JLA.extentLocalCache();
                        } else {
                            extentLocalCache = null;
                        }
                        JLA.setExtentLocalCache(null);
                    } catch (Throwable e) { e.printStackTrace(); System.exit(1); }
                }
                
    			// 删除父节点相关代码
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    run()方法和yield()方法配合,yield()可以中断Continuation执行,run()可以执行或者继续执行实际任务。

    总结

    总共花三篇文章大概介绍了JDK19中新增的虚拟线程,从LOOM Project的纤程、阿里的wisp和wisp2、到今天JEP 425中的虚拟线程,官方提供的生产可用的用户态线程方案距离我们越来越近了。

  • 相关阅读:
    wordpress使用category order and taxonomy terms order插件实现分类目录的拖拽排序
    核函数简介
    小学生python游戏编程arcade----敌人自动面向角色并开火
    第五十七章 学习常用技能 - 查看Globals
    GIS前端—Popup标注视图
    元素显示类型-快元素、行内元素、行内快元素、盒子模型以及元素类型相互转换
    Value-Based RL
    扫码登录认证技术原理介绍及实践
    写论文时,用WPS里的公式编辑器导出的公式有问题,看图片
    Java并发操作之synchronized互斥锁总结
  • 原文地址:https://blog.csdn.net/a860MHz/article/details/127818347