• 【skynet】skynet消息处理与协程


    skynet消息处理与协程

    skynet.start函数是由我们通过skynet.newservice创建好的lua服务,对应lua服务文件里调用的。

    我们来看skynet.start函数内容,通过c.callback设置回调函数skynet.dispatch_message。当工作线程处理服务消息队列的消息时,就会主动调用skynet.dispatch_message函数。

    function skynet.start(start_func)
    	c.callback(skynet.dispatch_message)
    	init_thread = skynet.timeout(0, function()
    		skynet.init_service(start_func)
    		init_thread = nil
    	end)
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    我们具体看skynet.dispatch_message函数,第一句pcall调用了raw_dispatch_message函数创建协程。在while循环中,判断fork_queue如果满足条件(即服务的消息队列的消息要被工作线程处理了,改变这个fork_queue结构里面的值),然后调用pcall执行协程。

    function skynet.dispatch_message(...)
    	local succ, err = pcall(raw_dispatch_message,...)
    	while true do
    		if fork_queue.h > fork_queue.t then
    			-- queue is empty
    			fork_queue.h = 1
    			fork_queue.t = 0
    			break
    		end
    		-- pop queue
    		local h = fork_queue.h
    		local co = fork_queue[h]
    		fork_queue[h] = nil
    		fork_queue.h = h + 1
    
    		local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
    		if not fork_succ then
    			if succ then
    				succ = false
    				err = tostring(fork_err)
    			else
    				err = tostring(err) .. "\n" .. tostring(fork_err)
    			end
    		end
    	end
    	assert(succ, tostring(err))
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    我们看raw_dispatch_message函数,注意到local co = co_create(f)创建了一个协程,然后调用suspend函数把创建好的协程挂起。

    local function raw_dispatch_message(prototype, msg, sz, session, source)
    	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
    	if prototype == 1 then
    		local co = session_id_coroutine[session]
    		if co == "BREAK" then
    			session_id_coroutine[session] = nil
    		elseif co == nil then
    			unknown_response(session, source, msg, sz)
    		else
    			local tag = session_coroutine_tracetag[co]
    			if tag then c.trace(tag, "resume") end
    			session_id_coroutine[session] = nil
    			suspend(co, coroutine_resume(co, true, msg, sz, session))
    		end
    	else
    		local p = proto[prototype]
    		if p == nil then
    			if prototype == skynet.PTYPE_TRACE then
    				-- trace next request
    				trace_source[source] = c.tostring(msg,sz)
    			elseif session ~= 0 then
    				c.send(source, skynet.PTYPE_ERROR, session, "")
    			else
    				unknown_request(session, source, msg, sz, prototype)
    			end
    			return
    		end
    
    		local f = p.dispatch
    		if f then
    			local co = co_create(f)
    			session_coroutine_id[co] = session
    			session_coroutine_address[co] = source
    			local traceflag = p.trace
    			if traceflag == false then
    				-- force off
    				trace_source[source] = nil
    				session_coroutine_tracetag[co] = false
    			else
    				local tag = trace_source[source]
    				if tag then
    					trace_source[source] = nil
    					c.trace(tag, "request")
    					session_coroutine_tracetag[co] = tag
    				elseif traceflag then
    					-- set running_thread for trace
    					running_thread = co
    					skynet.trace()
    				end
    			end
    			suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
    		else
    			trace_source[source] = nil
    			if session ~= 0 then
    				c.send(source, skynet.PTYPE_ERROR, session, "")
    			else
    				unknown_request(session, source, msg, sz, proto[prototype].name)
    			end
    		end
    	end
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    我们看co_create函数,从协程池中取出一个空闲协程,没有,则创建一个新的协程。

    local function co_create(f)
    	local co = tremove(coroutine_pool)
    	if co == nil then
    		co = coroutine_create(function(...)
    			f(...)
    			while true do
    				local session = session_coroutine_id[co]
    				if session and session ~= 0 then
    					local source = debug.getinfo(f,"S")
    					skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
    						session,
    						skynet.address(session_coroutine_address[co]),
    						source.source, source.linedefined))
    				end
    				-- coroutine exit
    				local tag = session_coroutine_tracetag[co]
    				if tag ~= nil then
    					if tag then c.trace(tag, "end")	end
    					session_coroutine_tracetag[co] = nil
    				end
    				local address = session_coroutine_address[co]
    				if address then
    					session_coroutine_id[co] = nil
    					session_coroutine_address[co] = nil
    				end
    
    				-- recycle co into pool
    				f = nil
    				coroutine_pool[#coroutine_pool+1] = co
    				-- recv new main function f
    				f = coroutine_yield "SUSPEND"
    				f(coroutine_yield())
    			end
    		end)
    	else
    		-- pass the main function f to coroutine, and restore running thread
    		local running = running_thread
    		coroutine_resume(co, f)
    		running_thread = running
    	end
    	return co
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    因此,当工作线程处理服务的消息队列里的消息时,就会调用回调函数skynet.dispatch_message函数执行之前就创建好的协程。

    skynet的服务消息被工作线程执行的过程

    1. 内核线程从skynet框架的全局队列取出服务。
    2. 内核线程从服务的消息队列取出消息。
    3. 执行消息的协程运行绑定的func(即协程执行skynet.dispatch(func)中的func)。
    4. 重复步骤2、3,直到处理指定消息数或者服务的消息队列位空。
    5. 如果服务的消息队列不为空,把该服务重新插入到skynet框架的全局队列中。

    线程和协程关系

    • 一个线程可以开启多个协程,同一个线程某一时刻只能有一个协程正在运行,其他协程只能挂起,因此协程不能利用多核能力(不能并行)。
    • 线程可以充分利用多核,因为每个线程可以同时刻运行在不同CPU核心上,体现了并行性。

    skynet中利用协程提高线程的并发性

    skynet运行比较快的重要原因就是利用协程:在线程执行过程中,开启多个了协程,然后按照协程执行顺序的执行过程。当进行io等耗时操作时,直接挂起某个协程,继续执行当前线程的其他协程。因此协程提高了这个线程的执行并发性,减少了线程切换的损耗。

    这样做的好处是:如果是利用线程执行io挂起,切换其他线程继续执行逻辑,导致线程切换的消耗(寄存器、栈等)。用协程的话省略了线程切换的消耗(因为协程是用户态的线程,内核并不知晓协程的存在,协程也可以简单理解为在一个线程里执行的可以控制执行顺序的函数)。

  • 相关阅读:
    xv6---Lab2: system calls
    GPT系列论文解读:GPT-3
    项目管理系统(Java+Web+MySQL)
    使用 `open-uri.with_proxy` 方法打开网页
    第二篇:矩阵的翻转JavaScript
    【5 操作系统调度】
    客户案例:CAC2.0监测异常账号行为,缓解暴力破解攻击
    Java 复习笔记 - 常用API 中
    SpringBoot SpringBoot 原理篇 1 自动配置 1.3 bean 的加载方式【三】
    做亚马逊店铺怎么解决网络问题?
  • 原文地址:https://blog.csdn.net/qq_37717687/article/details/122135566