skynet.start函数是由我们通过skynet.newservice创建好的lua服务,对应lua服务文件里调用的。
我们来看skynet.start函数内容,通过c.callback设置回调函数skynet.dispatch_message。当工作线程处理服务消息队列的消息时,就会主动调用skynet.dispatch_message函数。
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func)
init_thread = nil
end)
end
我们具体看skynet.dispatch_message函数,第一句pcall调用了raw_dispatch_message函数创建协程。在while循环中,判断fork_queue如果满足条件(即服务的消息队列的消息要被工作线程处理了,改变这个fork_queue结构里面的值),然后调用pcall执行协程。
function skynet.dispatch_message(...)
local succ, err = pcall(raw_dispatch_message,...)
while true do
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1
fork_queue.t = 0
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
fork_queue[h] = nil
fork_queue.h = h + 1
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
if not fork_succ then
if succ then
succ = false
err = tostring(fork_err)
else
err = tostring(err) .. "\n" .. tostring(fork_err)
end
end
end
assert(succ, tostring(err))
end
我们看raw_dispatch_message函数,注意到local co = co_create(f)创建了一个协程,然后调用suspend函数把创建好的协程挂起。
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz, session))
end
else
local p = proto[prototype]
if p == nil then
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch
if f then
local co = co_create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = source
local traceflag = p.trace
if traceflag == false then
-- force off
trace_source[source] = nil
session_coroutine_tracetag[co] = false
else
local tag = trace_source[source]
if tag then
trace_source[source] = nil
c.trace(tag, "request")
session_coroutine_tracetag[co] = tag
elseif traceflag then
-- set running_thread for trace
running_thread = co
skynet.trace()
end
end
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
end
我们看co_create函数,从协程池中取出一个空闲协程,没有,则创建一个新的协程。
local function co_create(f)
local co = tremove(coroutine_pool)
if co == nil then
co = coroutine_create(function(...)
f(...)
while true do
local session = session_coroutine_id[co]
if session and session ~= 0 then
local source = debug.getinfo(f,"S")
skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
session,
skynet.address(session_coroutine_address[co]),
source.source, source.linedefined))
end
-- coroutine exit
local tag = session_coroutine_tracetag[co]
if tag ~= nil then
if tag then c.trace(tag, "end") end
session_coroutine_tracetag[co] = nil
end
local address = session_coroutine_address[co]
if address then
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
end
-- recycle co into pool
f = nil
coroutine_pool[#coroutine_pool+1] = co
-- recv new main function f
f = coroutine_yield "SUSPEND"
f(coroutine_yield())
end
end)
else
-- pass the main function f to coroutine, and restore running thread
local running = running_thread
coroutine_resume(co, f)
running_thread = running
end
return co
end
因此,当工作线程处理服务的消息队列里的消息时,就会调用回调函数skynet.dispatch_message函数执行之前就创建好的协程。
skynet的服务消息被工作线程执行的过程:
线程和协程关系:
skynet中利用协程提高线程的并发性:
skynet运行比较快的重要原因就是利用协程:在线程执行过程中,开启多个了协程,然后按照协程执行顺序的执行过程。当进行io等耗时操作时,直接挂起某个协程,继续执行当前线程的其他协程。因此协程提高了这个线程的执行并发性,减少了线程切换的损耗。
这样做的好处是:如果是利用线程执行io挂起,切换其他线程继续执行逻辑,导致线程切换的消耗(寄存器、栈等)。用协程的话省略了线程切换的消耗(因为协程是用户态的线程,内核并不知晓协程的存在,协程也可以简单理解为在一个线程里执行的可以控制执行顺序的函数)。