概念:Lua的协程是用户级的非抢占式线程,用户级是指它的切换和调度由用户控制,非抢占是指一个协程只在其使用yield挂起或者结束才能返回;协程和C线程一样有自己的堆栈存储局部变量,可以用来保存自己的上下文信息,同时可以和其它协程共享全局变量。
说明:有三个协程按顺序先后创建coA、coB、coC,那么在没有任意一条协程主动挂起(yield)的情况下,执行顺序则是coA执行完,在执行coB,然后再执行coC。也就是说,除非有协程主动要求挂起,否则必须等当前协程执行完,再去执行下面一个创建的协程。比如说,coA执行完,接着就是执行coB,此时coB挂起,那么直接执行coC,coC执行完以后,如果coB被唤醒了,则接着上次开始阻塞的部分继续执行余下的逻辑。
官网demo
function foo (a)
print("foo", a)
return coroutine.yield(2*a)
end
co = coroutine.create(function (a,b)
print("co-body", a, b)
local r = foo(a+1)
print("co-body", r)
local r, s = coroutine.yield(a+b, a-b)
print("co-body", r, s)
return b, "end"
end)
print("main", coroutine.resume(co, 1, 10))
print("main", coroutine.resume(co, "r"))
print("main", coroutine.resume(co, "x", "y"))
print("main", coroutine.resume(co, "x", "y"))
--
[[ 输出结果:(调用resume,将协同程序唤醒,resume操作成功返回true,否则返回false)
co-body 1 10
foo 2 //协程挂起
main true 4 //resume 协程从挂起地方继续执行
co-body r //协程挂起
main true 11 -9 //resume 协程从挂起地方继续执行
co-body x y //return end
main true 10 end
main false cannot resume dead coroutine //如果使用的协同程序继续运行完成后继续调用 resume方法则输出:cannot resume dead coroutine
]]
--[[
--skynet.lua
]]
local function co_create(f)
local co = tremove(coroutine_pool)
if co == nil then
co = coroutine_create(function(...)
f(...)
while true do
local session = session_coroutine_id[co]
if session and session ~= 0 then
local source = debug.getinfo(f,"S")
skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
session,
skynet.address(session_coroutine_address[co]),
source.source, source.linedefined))
end
-- coroutine exit
local tag = session_coroutine_tracetag[co]
if tag ~= nil then
if tag then c.trace(tag, "end") end
session_coroutine_tracetag[co] = nil
end
local address = session_coroutine_address[co]
if address then
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
end
-- recycle co into pool
f = nil
coroutine_pool[#coroutine_pool+1] = co
-- recv new main function f
f = coroutine_yield "SUSPEND"
f(coroutine_yield())
end
end)
else
-- pass the main function f to coroutine, and restore running thread
local running = running_thread
coroutine_resume(co, f)
running_thread = running
end
return co
end
注意:(skynet源码赏析 skynet为1.0版本 以下内容为skynet1.5版本 有一些改动 但是大体实现思路是一致的)
在消息调度机制,了解skynet发消息的实质是往服务的次级消息队列压入消息,但是从启动lua服务了解到服务是运行在lua沙盒上的。所以一个lua服务在接收消息时,最终会传给lua层的消息回调函数skynet.dispatch_message。
-- skynet.lua
function skynet.start(start_func)
c.callback(skynet.dispatch_message) --消息回调函数 服务启动时创建
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func)
init_thread = nil
end)
end
-- skynet.lua
function skynet.dispatch_message(...)
-- 消费当前消息
local succ, err = pcall(raw_dispatch_message,...)
while true do
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1
fork_queue.t = 0
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
fork_queue[h] = nil
fork_queue.h = h + 1
-- 调用skynet.fork创建的协程
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
if not fork_succ then
if succ then
succ = false
err = tostring(fork_err)
else
err = tostring(err) .. "\n" .. tostring(fork_err)
end
end
end
assert(succ, tostring(err))
end
-- skynet.lua
local function raw_dispatch_message(prototype, msg, sz, session, source) -- 处理当前消息
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then --对其他服务call
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz, session))
end
else --其他服务send
local p = proto[prototype] -- 找到与消息类型对应的解析协议
if p == nil then
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch -- 获取消息处理函数,可以视为该类协议的消息回调函数
if f then
local co = co_create(f) -- 如果协程池内有空闲的协程,则直接返回,否则创建一个新的协程,该协程用于执行该类协议的消息处理函数dispatch
session_coroutine_id[co] = session
session_coroutine_address[co] = source
local traceflag = p.trace
if traceflag == false then
-- force off
trace_source[source] = nil
session_coroutine_tracetag[co] = false
else
local tag = trace_source[source]
if tag then
trace_source[source] = nil
c.trace(tag, "request")
session_coroutine_tracetag[co] = tag
elseif traceflag then
-- set running_thread for trace
running_thread = co
skynet.trace()
end
end
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) -- 启动并执行协程,将结果返回给suspend
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
end
从raw_dispatch_message 消息处理的逻辑来看,消息处理的分为两种情况,一种是其他服务send过来的消息,还有一种就是自己发起同步rpc调用(调用call)后,获得的返回结果(返回消息的类型是PTYPE_RESPONSE)。
执行流程如下:
根据消息的类型,找到对应的先前注册好的消息解析协议
获取一个协程(如果协程池中有空闲的协程,则直接获取,否则重新创建一个),并让该协程执行消息处理协议的回调函数dispatch
local function co_create(f)
local co = tremove(coroutine_pool)
if co == nil then ---- 协程池中找不到可以用的协程时,将重新创建一个
co = coroutine_create(function(...)
f(...) -- 执行回调函数,创建协程时,并不会立即执行,只有调用coroutine.resume时,才会执行内部逻辑,这行代码,只有在首次创建时会被调用
while true do
local session = session_coroutine_id[co]
if session and session ~= 0 then
local source = debug.getinfo(f,"S")
skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
session,
skynet.address(session_coroutine_address[co]),
source.source, source.linedefined))
end
-- coroutine exit
-- 上面的逻辑在完成回调函数调用后,会对协程进行回收,它会将回调函数清掉,并且将当前协程写入协程缓存列表中,然后挂起协程
local tag = session_coroutine_tracetag[co]
if tag ~= nil then
if tag then c.trace(tag, "end") end
session_coroutine_tracetag[co] = nil
end
local address = session_coroutine_address[co]
if address then
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
end
-- !!!!! 将upvalue回调函数f赋值为空,再放入协程缓存池中,并且挂起,以便下次使用 !!!!!!!
f = nil
coroutine_pool[#coroutine_pool+1] = co
-- recv new main function f
f = coroutine_yield "SUSPEND" -- (1)
f(coroutine_yield()) -- (2)
end
end)
else
-- pass the main function f to coroutine, and restore running thread
local running = running_thread
coroutine_resume(co, f) -- 唤醒第(1)处代码,并将新的回调函数,赋值给(1)处的upvalue f函数,此时在第(2)个yield处挂起
running_thread = running
end
return co
end
--[[
local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
...
-- 如果是创建后第一次使用这个coroutine,这里的coroutine.resume函数,将会唤醒该coroutine,并将第二个至最后一个参数,传给运行的函数
-- 如果是一个复用中的协程,那么这里的coroutine.resume会将第二个至最后一个参数,通过第(2)处的coroutine_yield返回给消息回调函数
suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
...
end
]]
启动并执行协程,将协程执行的结果返回给suspend函数,返回结果,就是一个coroutine挂起的原因,这个suspend函数,就是针对coroutine挂起的不同原因,做专门的处理
-- skynet.lua
function suspend(co, result, command)
if not result then
local session = session_coroutine_id[co]
if session then -- coroutine may fork by others (session is nil)
local addr = session_coroutine_address[co]
if session ~= 0 then
-- only call response error
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "error") end
c.send(addr, skynet.PTYPE_ERROR, session, "")
end
session_coroutine_id[co] = nil
end
session_coroutine_address[co] = nil
session_coroutine_tracetag[co] = nil
skynet.fork(function() end) -- trigger command "SUSPEND"
local tb = traceback(co,tostring(command))
coroutine.close(co)
error(tb)
end
if command == "SUSPEND" then
return dispatch_wakeup()
elseif command == "QUIT" then
coroutine.close(co)
-- service exit
return
elseif command == "USER" then
-- See skynet.coutine for detail
error("Call skynet.coroutine.yield out of skynet.coroutine.resume\n" .. traceback(co))
elseif command == nil then
-- debug trace
return
else
error("Unknown command : " .. command .. "\n" .. traceback(co))
end
end
local function dispatch_wakeup()
while true do
local token = tremove(wakeup_queue,1)
if token then
local session = sleep_session[token]
if session then
local co = session_id_coroutine[session]
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = "BREAK"
return suspend(co, coroutine_resume(co, false, "BREAK", nil, session))
end
else
break
end
end
return dispatch_error_queue()
end
在lua层处理一条消息,本质上是在一个协程里进行的,因此要以协程句柄作为key,保存这些变量。协程每次暂停,都需要使用或处理这些数据,并告知当前协程的状态,以及要根据不同的状态做出相应的处理逻辑,比如当一个协程使用完毕时,就会挂起,意味着协程已经和之前的消息无关系了,需要清空与本协程关联的所有消息相关的信息,以便下一条消息使用。
执行流程如下:
-- skynet.lua
function skynet.ret(msg, sz)
msg = msg or ""
local tag = session_coroutine_tracetag[running_thread]
if tag then c.trace(tag, "response") end
local co_session = session_coroutine_id[running_thread]
if co_session == nil then
error "No session"
end
session_coroutine_id[running_thread] = nil
if co_session == 0 then
if sz ~= nil then
c.trash(msg, sz)
end
return false -- send don't need ret
end
local co_address = session_coroutine_address[running_thread]
local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz) -- PTYPE_RESPONSE = 1
if ret then
return true
elseif ret == false then
-- If the package is too large, returns false. so we should report error back
c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
end
return false
end
--[[
协程发起一次同步RPC调用(挂起状态类型为“SUSPEND”),首先要挂起当前协程,然后是将目的服务发送一个消息,并且在本地记录一个唯一的session值,以session为key,协程地址为value存入一个table表中,目的是,当对方返回结果,或者定时器到达时间timer线程向本服务发送一个唤醒原来协程的消息时,能够通过session找到对应的协程,并将其唤醒,从之前挂起的地方继续执行下去。
]]
local function yield_call(service, session)
watching_session[session] = service
session_id_coroutine[session] = running_thread
local succ, msg, sz = coroutine_yield "SUSPEND"
watching_session[session] = nil
if not succ then
error "call failed"
end
return msg,sz
end
-- skynet.call
function skynet.call(addr, typename, ...)
local tag = session_coroutine_tracetag[running_thread]
if tag then
c.trace(tag, "call", 2)
c.send(addr, skynet.PTYPE_TRACE, 0, tag)
end
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end
--skynet.lua
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then -- Call
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz, session))
end
else
---
end