• 服务端Skynet(四)——lua层消息处理机制


    服务端Skynet(四)——lua层消息处理机制


    参考文献

    skynet设计综述

    skynet源码赏析

    1、协程的基本知识

    ​ 概念:Lua的协程是用户级的非抢占式线程,用户级是指它的切换和调度由用户控制,非抢占是指一个协程只在其使用yield挂起或者结束才能返回;协程和C线程一样有自己的堆栈存储局部变量,可以用来保存自己的上下文信息,同时可以和其它协程共享全局变量。

    ​ 说明:有三个协程按顺序先后创建coA、coB、coC,那么在没有任意一条协程主动挂起(yield)的情况下,执行顺序则是coA执行完,在执行coB,然后再执行coC。也就是说,除非有协程主动要求挂起,否则必须等当前协程执行完,再去执行下面一个创建的协程。比如说,coA执行完,接着就是执行coB,此时coB挂起,那么直接执行coC,coC执行完以后,如果coB被唤醒了,则接着上次开始阻塞的部分继续执行余下的逻辑。

    Lua 5.3 参考手册

    ​ 官网demo

     function foo (a)
       print("foo", a)
       return coroutine.yield(2*a)    
     end
    
     co = coroutine.create(function (a,b)
           print("co-body", a, b)
           local r = foo(a+1)
           print("co-body", r)
           local r, s = coroutine.yield(a+b, a-b)
           print("co-body", r, s)
           return b, "end"
     end)
    
     print("main", coroutine.resume(co, 1, 10))
     print("main", coroutine.resume(co, "r"))
     print("main", coroutine.resume(co, "x", "y"))
     print("main", coroutine.resume(co, "x", "y"))
    
    --
    [[  输出结果:(调用resume,将协同程序唤醒,resume操作成功返回true,否则返回false)
    	co-body 1       10
         foo     2					//协程挂起
         main    true    4			 //resume 协程从挂起地方继续执行
         co-body r					//协程挂起
         main    true    11      -9	  //resume 协程从挂起地方继续执行	
         co-body x       y			 //return end
         main    true    10      end
         main    false   cannot resume dead coroutine  //如果使用的协同程序继续运行完成后继续调用 resume方法则输出:cannot resume dead coroutine
    ]]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2、skynet协程创建

    --[[
    	--skynet.lua
    
    ]]
    local function co_create(f)
    	local co = tremove(coroutine_pool)
    	if co == nil then
    		co = coroutine_create(function(...)
    			f(...)
    			while true do
    				local session = session_coroutine_id[co]
    				if session and session ~= 0 then
    					local source = debug.getinfo(f,"S")
    					skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
    						session,
    						skynet.address(session_coroutine_address[co]),
    						source.source, source.linedefined))
    				end
    				-- coroutine exit
    				local tag = session_coroutine_tracetag[co]
    				if tag ~= nil then
    					if tag then c.trace(tag, "end")	end
    					session_coroutine_tracetag[co] = nil
    				end
    				local address = session_coroutine_address[co]
    				if address then
    					session_coroutine_id[co] = nil
    					session_coroutine_address[co] = nil
    				end
    
    				-- recycle co into pool
    				f = nil
    				coroutine_pool[#coroutine_pool+1] = co
    				-- recv new main function f
    				f = coroutine_yield "SUSPEND"
    				f(coroutine_yield())
    			end
    		end)
    	else
    		-- pass the main function f to coroutine, and restore running thread
    		local running = running_thread
    		coroutine_resume(co, f)
    		running_thread = running
    	end
    	return co
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    3、lua消息处理机制

    注意:(skynet源码赏析 skynet为1.0版本 以下内容为skynet1.5版本 有一些改动 但是大体实现思路是一致的)

    消息调度机制,了解skynet发消息的实质是往服务的次级消息队列压入消息,但是从启动lua服务了解到服务是运行在lua沙盒上的。所以一个lua服务在接收消息时,最终会传给lua层的消息回调函数skynet.dispatch_message。

    -- skynet.lua
    function skynet.start(start_func)
    	c.callback(skynet.dispatch_message)  --消息回调函数  服务启动时创建
    	init_thread = skynet.timeout(0, function()
    		skynet.init_service(start_func)
    		init_thread = nil
    	end)
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    -- skynet.lua
    function skynet.dispatch_message(...)
        -- 消费当前消息
    	local succ, err = pcall(raw_dispatch_message,...)
    	while true do
    		if fork_queue.h > fork_queue.t then
    			-- queue is empty
    			fork_queue.h = 1
    			fork_queue.t = 0
    			break
    		end
    		-- pop queue
    		local h = fork_queue.h
    		local co = fork_queue[h]
    		fork_queue[h] = nil
    		fork_queue.h = h + 1
            -- 调用skynet.fork创建的协程
    		local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
    		if not fork_succ then
    			if succ then
    				succ = false
    				err = tostring(fork_err)
    			else
    				err = tostring(err) .. "\n" .. tostring(fork_err)
    			end
    		end
    	end
    	assert(succ, tostring(err))
    end
    
    -- skynet.lua
    local function raw_dispatch_message(prototype, msg, sz, session, source)	-- 处理当前消息
    	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
    	if prototype == 1 then  --对其他服务call
    		local co = session_id_coroutine[session]
    		if co == "BREAK" then
    			session_id_coroutine[session] = nil
    		elseif co == nil then
    			unknown_response(session, source, msg, sz)
    		else
    			local tag = session_coroutine_tracetag[co]
    			if tag then c.trace(tag, "resume") end
    			session_id_coroutine[session] = nil
    			suspend(co, coroutine_resume(co, true, msg, sz, session))
    		end
    	else --其他服务send
    		local p = proto[prototype]		-- 找到与消息类型对应的解析协议
    		if p == nil then
    			if prototype == skynet.PTYPE_TRACE then
    				-- trace next request
    				trace_source[source] = c.tostring(msg,sz)
    			elseif session ~= 0 then
    				c.send(source, skynet.PTYPE_ERROR, session, "")
    			else
    				unknown_request(session, source, msg, sz, prototype)
    			end
    			return
    		end
    
    		local f = p.dispatch 		-- 获取消息处理函数,可以视为该类协议的消息回调函数
    		if f then
    			local co = co_create(f)	-- 如果协程池内有空闲的协程,则直接返回,否则创建一个新的协程,该协程用于执行该类协议的消息处理函数dispatch
    			session_coroutine_id[co] = session
    			session_coroutine_address[co] = source
    			local traceflag = p.trace
    			if traceflag == false then
    				-- force off
    				trace_source[source] = nil
    				session_coroutine_tracetag[co] = false
    			else
    				local tag = trace_source[source]
    				if tag then
    					trace_source[source] = nil
    					c.trace(tag, "request")
    					session_coroutine_tracetag[co] = tag
    				elseif traceflag then
    					-- set running_thread for trace
    					running_thread = co
    					skynet.trace()
    				end
    			end
    			suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))	-- 启动并执行协程,将结果返回给suspend
    		else
    			trace_source[source] = nil
    			if session ~= 0 then
    				c.send(source, skynet.PTYPE_ERROR, session, "")
    			else
    				unknown_request(session, source, msg, sz, proto[prototype].name)
    			end
    		end
    	end
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    ​ 从raw_dispatch_message 消息处理的逻辑来看,消息处理的分为两种情况,一种是其他服务send过来的消息,还有一种就是自己发起同步rpc调用(调用call)后,获得的返回结果(返回消息的类型是PTYPE_RESPONSE)。

    2.1 skynet.send

    ​ 执行流程如下:

    • 根据消息的类型,找到对应的先前注册好的消息解析协议

    • 获取一个协程(如果协程池中有空闲的协程,则直接获取,否则重新创建一个),并让该协程执行消息处理协议的回调函数dispatch

      local function co_create(f)
      	local co = tremove(coroutine_pool)
      	if co == nil then		---- 协程池中找不到可以用的协程时,将重新创建一个
      		co = coroutine_create(function(...)
      			f(...)	-- 执行回调函数,创建协程时,并不会立即执行,只有调用coroutine.resume时,才会执行内部逻辑,这行代码,只有在首次创建时会被调用
      			while true do
      				local session = session_coroutine_id[co]
      				if session and session ~= 0 then
      					local source = debug.getinfo(f,"S")
      					skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
      						session,
      						skynet.address(session_coroutine_address[co]),
      						source.source, source.linedefined))
      				end
      				-- coroutine exit
                        -- 上面的逻辑在完成回调函数调用后,会对协程进行回收,它会将回调函数清掉,并且将当前协程写入协程缓存列表中,然后挂起协程
      				local tag = session_coroutine_tracetag[co]
      				if tag ~= nil then
      					if tag then c.trace(tag, "end")	end
      					session_coroutine_tracetag[co] = nil
      				end
      				local address = session_coroutine_address[co]
      				if address then
      					session_coroutine_id[co] = nil
      					session_coroutine_address[co] = nil
      				end
      
      				-- !!!!! 将upvalue回调函数f赋值为空,再放入协程缓存池中,并且挂起,以便下次使用  !!!!!!!
      				f = nil
      				coroutine_pool[#coroutine_pool+1] = co
      				-- recv new main function f
      				f = coroutine_yield "SUSPEND"	-- (1)
      				f(coroutine_yield())	 -- (2)
      			end
      		end)
      	else
      		-- pass the main function f to coroutine, and restore running thread
      		local running = running_thread
      		coroutine_resume(co, f)	 -- 唤醒第(1)处代码,并将新的回调函数,赋值给(1)处的upvalue f函数,此时在第(2)个yield处挂起
      		running_thread = running
      	end
      	return co
      end
      
      --[[
      local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
          ...
          -- 如果是创建后第一次使用这个coroutine,这里的coroutine.resume函数,将会唤醒该coroutine,并将第二个至最后一个参数,传给运行的函数
          -- 如果是一个复用中的协程,那么这里的coroutine.resume会将第二个至最后一个参数,通过第(2)处的coroutine_yield返回给消息回调函数
          suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
          ...
      end
      ]]
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
    • 启动并执行协程,将协程执行的结果返回给suspend函数,返回结果,就是一个coroutine挂起的原因,这个suspend函数,就是针对coroutine挂起的不同原因,做专门的处理

    -- skynet.lua
    function suspend(co, result, command)
    	if not result then
    		local session = session_coroutine_id[co]
    		if session then -- coroutine may fork by others (session is nil)
    			local addr = session_coroutine_address[co]
    			if session ~= 0 then
    				-- only call response error
    				local tag = session_coroutine_tracetag[co]
    				if tag then c.trace(tag, "error") end
    				c.send(addr, skynet.PTYPE_ERROR, session, "")
    			end
    			session_coroutine_id[co] = nil
    		end
    		session_coroutine_address[co] = nil
    		session_coroutine_tracetag[co] = nil
    		skynet.fork(function() end)	-- trigger command "SUSPEND"
    		local tb = traceback(co,tostring(command))
    		coroutine.close(co)
    		error(tb)
    	end
    	if command == "SUSPEND" then
    		return dispatch_wakeup()
    	elseif command == "QUIT" then
    		coroutine.close(co)
    		-- service exit
    		return
    	elseif command == "USER" then
    		-- See skynet.coutine for detail
    		error("Call skynet.coroutine.yield out of skynet.coroutine.resume\n" .. traceback(co))
    	elseif command == nil then
    		-- debug trace
    		return
    	else
    		error("Unknown command : " .. command .. "\n" .. traceback(co))
    	end
    end
    
    local function dispatch_wakeup()
    	while true do
    		local token = tremove(wakeup_queue,1)
    		if token then
    			local session = sleep_session[token]
    			if session then
    				local co = session_id_coroutine[session]
    				local tag = session_coroutine_tracetag[co]
    				if tag then c.trace(tag, "resume") end
    				session_id_coroutine[session] = "BREAK"
    				return suspend(co, coroutine_resume(co, false, "BREAK", nil, session))
    			end
    		else
    			break
    		end
    	end
    	return dispatch_error_queue()
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    ​ 在lua层处理一条消息,本质上是在一个协程里进行的,因此要以协程句柄作为key,保存这些变量。协程每次暂停,都需要使用或处理这些数据,并告知当前协程的状态,以及要根据不同的状态做出相应的处理逻辑,比如当一个协程使用完毕时,就会挂起,意味着协程已经和之前的消息无关系了,需要清空与本协程关联的所有消息相关的信息,以便下一条消息使用。

    2.2 skynet.call

    ​ 执行流程如下:

    • 发起一个同步rpc调用,向目标服务的次级消息队列插入一个消息
    -- skynet.lua
    function skynet.ret(msg, sz)
    	msg = msg or ""
    	local tag = session_coroutine_tracetag[running_thread]
    	if tag then c.trace(tag, "response") end
    	local co_session = session_coroutine_id[running_thread]
    	if co_session == nil then
    		error "No session"
    	end
    	session_coroutine_id[running_thread] = nil
    	if co_session == 0 then
    		if sz ~= nil then
    			c.trash(msg, sz)
    		end
    		return false	-- send don't need ret
    	end
    	local co_address = session_coroutine_address[running_thread]
    	local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz)	--	PTYPE_RESPONSE = 1
    	if ret then
    		return true
    	elseif ret == false then
    		-- If the package is too large, returns false. so we should report error back
    		c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
    	end
    	return false
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 挂起当前协程,yield_call里的coroutine_yield(“SUSPEND”, session)使得当前协程挂起,并在此时suspend执行记录session为key,协程地址为value,将其写入一个table session_id_coroutine中,此时协程等待对方返回消息
    --[[
    	协程发起一次同步RPC调用(挂起状态类型为“SUSPEND”),首先要挂起当前协程,然后是将目的服务发送一个消息,并且在本地记录一个唯一的session值,以session为key,协程地址为value存入一个table表中,目的是,当对方返回结果,或者定时器到达时间timer线程向本服务发送一个唤醒原来协程的消息时,能够通过session找到对应的协程,并将其唤醒,从之前挂起的地方继续执行下去。
    ]]
    local function yield_call(service, session)
    	watching_session[session] = service
    	session_id_coroutine[session] = running_thread
    	local succ, msg, sz = coroutine_yield "SUSPEND"
    	watching_session[session] = nil
    	if not succ then
    		error "call failed"
    	end
    	return msg,sz
    end
    
    -- skynet.call
    function skynet.call(addr, typename, ...)
    	local tag = session_coroutine_tracetag[running_thread]
    	if tag then
    		c.trace(tag, "call", 2)
    		c.send(addr, skynet.PTYPE_TRACE, 0, tag)
    	end
    
    	local p = proto[typename]
    	local session = c.send(addr, p.id , nil , p.pack(...))
    	if session == nil then
    		error("call to invalid address " .. skynet.address(addr))
    	end
    	return p.unpack(yield_call(addr, session))
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 当目标服务返回结果时,先根据session找到先前挂起的协程地址,然后通过resume函数唤醒他,此时call返回结果,一次同步rpc调用就结束了。
    --skynet.lua
    local function raw_dispatch_message(prototype, msg, sz, session, source)
    	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
    	if prototype == 1 then  -- Call
    		local co = session_id_coroutine[session]
    		if co == "BREAK" then
    			session_id_coroutine[session] = nil
    		elseif co == nil then
    			unknown_response(session, source, msg, sz)
    		else
    			local tag = session_coroutine_tracetag[co]
    			if tag then c.trace(tag, "resume") end
    			session_id_coroutine[session] = nil
    			suspend(co, coroutine_resume(co, true, msg, sz, session))
    		end
    	else
    	---
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    DDD概念理解
    一文入门USB设备的驱动编写方法
    模板引擎Thymeleaf和监听器
    介绍几个语言生成的预训练模型
    Spring--getBean()与@Autowired的对比
    Android 13.0 SystemUI下拉状态栏增加响铃功能
    OpenGL多线程多视图的实现 编程
    如何编写定时关机脚本以保护服务器安全
    动态代理-JDK
    Java基础------真实大厂面试题汇总(含答案)
  • 原文地址:https://blog.csdn.net/weixin_43730892/article/details/127906337