• fission源码分析--fission调用http请求流程分析


    fission命令行的本质是封装http请求发送给router。但是本地的网络和k8s中router的网络是隔离的,所以需要一个代理,我们知道k8s中可以利用kubectl port-forward来实现端口转发,从而解决网络隔离问题,fission-cli代码也实现了端口转发的能力。

    本文通过fission请求function服务来分析下fission的原理。

    代码入口:cmd/fission-cli/main.go

    版本:v1.17.0

    前提

    1. k8s集群
    2. fission已安装
    1. $ fission version
    2. client:
    3. fission/core:
    4. BuildDate: "2022-09-16T13:24:57Z"
    5. GitCommit: b36e0516
    6. Version: v1.17.0
    7. server:
    8. fission/core:
    9. BuildDate: "2022-09-16T13:24:57Z"
    10. GitCommit: b36e0516
    11. Version: v1.17.0
    12. #
    13. $ fission check
    14. fission-services
    15. --------------------
    16. √ controller is running fine
    17. √ executor is running fine
    18. √ router is running fine
    19. √ storagesvc is running fine
    20. fission-version
    21. --------------------
    22. √ fission is up-to-date

    命令行

    这里通过调用fission function命令行来讲。

    fission function test --name helloworld
    

    客户端

    main

    1. func main() {
    2. // 命令行封装
    3. cmd := app.App()
    4. cmd.SilenceErrors = true // use our own error message printer
    5. // 实际执行
    6. err := cmd.Execute()
    7. if err != nil {
    8. // let program exit with non-zero code when error occurs
    9. console.Error(err.Error())
    10. os.Exit(1)
    11. }
    12. }

    App

    1. func App() *cobra.Command {
    2. ...
    3. groups := helptemplate.CommandGroups{}
    4. groups = append(groups, helptemplate.CreateCmdGroup("Auth Commands(Note: Authentication should be enabled to use a command in this group.)", token.Commands()))
    5. groups = append(groups, helptemplate.CreateCmdGroup("Basic Commands", environment.Commands(), _package.Commands(), function.Commands(), archive.Commands()))
    6. groups = append(groups, helptemplate.CreateCmdGroup("Trigger Commands", httptrigger.Commands(), mqtrigger.Commands(), timetrigger.Commands(), kubewatch.Commands()))
    7. groups = append(groups, helptemplate.CreateCmdGroup("Deploy Strategies Commands", canaryconfig.Commands()))
    8. groups = append(groups, helptemplate.CreateCmdGroup("Declarative Application Commands", spec.Commands()))
    9. groups = append(groups, helptemplate.CreateCmdGroup("Other Commands", support.Commands(), version.Commands(), check.Commands()))
    10. ...
    11. }

    其中function.Commands()就是fission function子命令的入口

    function command

    代码入口:pkg/fission-cli/cmd/function/command.go

    1. func Commands() *cobra.Command {
    2. ...
    3. testCmd := &cobra.Command{
    4. Use: "test",
    5. Aliases: []string{},
    6. Short: "Test a function",
    7. RunE: wrapper.Wrapper(Test),
    8. }
    9. ...
    10. }

    核心代码是wrapper.Wrapper(Test),这个是cmd.Execute()实际调用的入口。

    Test处理

    代码入口:pkg/fission-cli/cmd/function/test.go

    1. func Test(input cli.Input) error {
    2. return (&TestSubCommand{}).do(input)
    3. }
    4. func (opts *TestSubCommand) do(input cli.Input) error {
    5. 。。。。。。
    6. // 端口映射,将localport映射到fission router的service port
    7. localRouterPort, err := util.SetupPortForward(util.GetFissionNamespace(), "application=fission-router", kubeContext)
    8. if err != nil {
    9. return err
    10. }
    11. // http://127.0.0.1:49703/fission-function/helloworld
    12. fnURL := "http://127.0.0.1:" + localRouterPort + util.UrlForFunction(m.Name, m.Namespace)
    13. functionUrl, err := url.Parse(fnURL)
    14. // 发送http请求
    15. resp, err := doHTTPRequest(ctx, functionUrl.String(),
    16. input.StringSlice(flagkey.FnTestHeader),
    17. method,
    18. input.String(flagkey.FnTestBody))
    19. if err != nil {
    20. return err
    21. }
    22. defer resp.Body.Close()
    23. body, err := io.ReadAll(resp.Body)
    24. if err != nil {
    25. return errors.Wrap(err, "error reading response from function")
    26. }
    27. if resp.StatusCode < 400 {
    28. // 输出到标准输出
    29. os.Stdout.Write(body)
    30. return nil
    31. }
    32. }

    代码入口:pkg/fission-cli/cmd/function/test.go

    1. // url: http://127.0.0.1:63177/fission-function/helloworld
    2. func doHTTPRequest(ctx context.Context, url string, headers []string, method, body string) (*http.Response, error) {
    3. ......
    4. // 新建http请求
    5. req, err := http.NewRequest(method, url, strings.NewReader(body))
    6. ......
    7. // 构建httpclient
    8. hc := &http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
    9. // http请求localhost:port会通过port-forward转发到k8s apiserver,从而请求到对应pod
    10. resp, err := hc.Do(req.WithContext(ctx))
    11. 。。。。。。
    12. }
    13. // 请求url:url: http://127.0.0.1:63177/fission-function/helloworld
    14. func (c *Client) Do(req *Request) (*Response, error) {
    15. return c.do(req)
    16. }
    17. func (c *Client) do(req *Request) (retres *Response, reterr error) {
    18. 。。。。。。
    19. for {
    20. if resp, didTimeout, err = c.send(req, deadline); err != nil {
    21. // c.send() always closes req.Body
    22. reqBodyClosed = true
    23. if !deadline.IsZero() && didTimeout() {
    24. err = &httpError{
    25. err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
    26. timeout: true,
    27. }
    28. }
    29. return nil, uerr(err)
    30. }
    31. }
    32. }
    33. // didTimeout is non-nil only if err != nil.
    34. func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    35. if c.Jar != nil {
    36. for _, cookie := range c.Jar.Cookies(req.URL) {
    37. req.AddCookie(cookie)
    38. }
    39. }
    40. resp, didTimeout, err = send(req, c.transport(), deadline)
    41. if err != nil {
    42. return nil, didTimeout, err
    43. }
    44. if c.Jar != nil {
    45. if rc := resp.Cookies(); len(rc) > 0 {
    46. c.Jar.SetCookies(req.URL, rc)
    47. }
    48. }
    49. return resp, nil, nil
    50. }
    51. // send issues an HTTP request.
    52. // Caller should close resp.Body when done reading from it.
    53. func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    54. // GET http://127.0.0.1:63177/fission-function/helloworld
    55. req := ireq // req is either the original request, or a modified fork
    56. }

    到这里,我们可以看到fission命令行实际是是一个http请求,http://127.0.0.1:49703/fission-function/helloworld,那么对端口49703的访问,如何发送给fission服务端呢?这里就需要了解port-forward的端口映射原理。

    端口映射

    为什么需要端口映射?这是因为宿主机的网路和k8s pod的网络是隔离的,所以宿主机的http请求无法直接请求到pod上,这就需要将宿主机的请求通过代理转发到pod上。这里用的是端口号映射机制,其原理和kubectl port-forward是一样的。如下图我们可以通过调整日志级别,查看fission test命令的详细日志。

    1. // Port forward a free local port to a pod on the cluster. The pod is
    2. // found in the specified namespace by labelSelector. The pod's port
    3. // is found by looking for a service in the same namespace and using
    4. // its targetPort. Once the port forward is started, wait for it to
    5. // start accepting connections before returning.
    6. func SetupPortForward(namespace, labelSelector string, kubeContext string) (string, error) {
    7. 。。。。。。
    8. // 本地端口号
    9. localPort, err := findFreePort()
    10. 。。。。。。
    11. // runPortForward启用一个local端口号,请求指向特定label的pod
    12. readyC, _, err := runPortForward(context.Background(), labelSelector, localPort, namespace, kubeContext)
    13. if err != nil {
    14. fmt.Printf("Error forwarding to port %v: %s", localPort, err.Error())
    15. return "", err
    16. }
    17. <-readyC
    18. 。。。。。。
    19. }
    20. // runPortForward creates a local port forward to the specified pod
    21. func runPortForward(ctx context.Context, labelSelector string, localPort string, ns string, kubeContext string) (chan struct{}, chan struct{}, error) {
    22. 。。。。。。
    23. // 通过label获取pod列表
    24. podList, err := clientset.CoreV1().Pods(ns).
    25. List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
    26. 。。。。。。
    27. // 通过label获取service,以及对应的port
    28. svcs, err := clientset.CoreV1().Services(podNameSpace).
    29. List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
    30. // 默认映射到router service的端口号8888
    31. var targetPort string
    32. for _, servicePort := range service.Spec.Ports {
    33. targetPort = servicePort.TargetPort.String()
    34. }
    35. 。。。。。。
    36. // 核心转换,从podname获取选定pod的k8s url,
    37. // Host:kubernetes.docker.internal:6443
    38. // Path:/api/v1/namespaces/fission/pods/controller-657d4cc757-lqlzf/portforward
    39. req := clientset.CoreV1().RESTClient().Post().Resource("pods").
    40. Namespace(podNameSpace).Name(podName).SubResource("portforward")
    41. url := req.URL()
    42. // create ports slice
    43. portCombo := localPort + ":" + targetPort
    44. // actually start the port-forwarding process here
    45. transport, upgrader, err := spdy.RoundTripperFor(config)
    46. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
    47. // New creates a new PortForwarder with localhost listen addresses.
    48. fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, outStream, os.Stderr)
    49. // ForwardPorts监听hhtp端口映射的请求,请求是一直连接,直到收到stopChan信号
    50. go func() {
    51. 。。。。。。
    52. err := fw.ForwardPorts()
    53. }()
    54. 。。。
    55. }
     
    

    http RoundTrip

    代码入口:pkg/router/functionHandler.go

    RoundTrip可以看做是http的中间件。RoundTrip是对http请求进行重试的自定义传输,它将请求转发到获得的正确serviceUrl

    从路由器的缓存,或从执行器,如果路由器中存在缓存的话。

    RoundTrip这里对http请求增加了缓存和重试能力。

    1. func (roundTripper *RetryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    2. 。。。。。。
    3. for i := 0; i < roundTripper.funcHandler.tsRoundTripperParams.maxRetries; i++ {
    4. // set service url of target service of request only when
    5. // trying to get new service url from cache/executor.
    6. // 调用请求:POST http://executor.fission/v2/getServiceForFunction
    7. roundTripper.serviceURL, roundTripper.urlFromCache, err = roundTripper.funcHandler.getServiceEntry(ctx)
    8. 。。。。。。
    9. // 调用请求:POST http://executor.fission/v2/unTapService
    10. go roundTripper.funcHandler.unTapService(fn, serviceURL) //nolint errcheck
    11. 。。。
    12. // fission发送的请求,最终发送到指定function pod上执行,fission请求获取返回结果
    13. resp, err := otelRoundTripper.RoundTrip(newReq)
    14. }
    15. 。。。。。。
    16. }

    代码入口:pkg/router/functionHandler.go

    1. // getServiceEntryFromExecutor returns service url entry returns from executor
    2. func (fh functionHandler) getServiceEntry(ctx context.Context) (svcURL *url.URL, cacheHit bool, err error) {
    3. if fh.function.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType == fv1.ExecutorTypePoolmgr {
    4. svcURL, err = fh.getServiceEntryFromExecutor(ctx)
    5. return svcURL, false, err
    6. }
    7. // Check if service URL present in cache
    8. svcURL, err = fh.getServiceEntryFromCache()
    9. if err == nil && svcURL != nil {
    10. return svcURL, true, nil
    11. } else if err != nil {
    12. return nil, false, err
    13. }
    14. fnMeta := &fh.function.ObjectMeta
    15. recordObj, err := fh.svcAddrUpdateThrottler.RunOnce(
    16. crd.CacheKey(fnMeta),
    17. func(firstToTheLock bool) (interface{}, error) {
    18. if !firstToTheLock {
    19. svcURL, err := fh.getServiceEntryFromCache()
    20. if err != nil {
    21. return nil, err
    22. }
    23. return svcEntryRecord{svcURL: svcURL, cacheHit: true}, err
    24. }
    25. svcURL, err = fh.getServiceEntryFromExecutor(ctx)
    26. if err != nil {
    27. return nil, err
    28. }
    29. fh.addServiceEntryToCache(svcURL)
    30. return svcEntryRecord{
    31. svcURL: svcURL,
    32. cacheHit: false,
    33. }, nil
    34. },
    35. )
    36. record, ok := recordObj.(svcEntryRecord)
    37. if !ok {
    38. return nil, false, fmt.Errorf("received unknown service record type")
    39. }
    40. return record.svcURL, record.cacheHit, err
    41. }
    代码入口:pkg/router/functionHandler.go
    1. func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serviceUrl *url.URL, err error) {
    2. ......
    3. service, err := fh.executor.GetServiceForFunction(fContext, fh.function)
    4. if err != nil {
    5. statusCode, errMsg := ferror.GetHTTPError(err)
    6. logger.Error("error from GetServiceForFunction",
    7. zap.Error(err),
    8. zap.String("error_message", errMsg),
    9. zap.Any("function", fh.function),
    10. zap.Int("status_code", statusCode))
    11. return nil, err
    12. }
    13. // parse the address into url
    14. svcURL, err := url.Parse(fmt.Sprintf("http://%v", service))
    15. if err != nil {
    16. logger.Error("error parsing service url",
    17. zap.Error(err),
    18. zap.String("service_url", svcURL.String()))
    19. return nil, err
    20. }
    21. return svcURL, err
    22. }
    代码入口:pkg/executor/client/client.go
    1. // GetServiceForFunction returns the service name for a given function.
    2. func (c *Client) GetServiceForFunction(ctx context.Context, fn *fv1.Function) (string, error) {
    3. executorURL := c.executorURL + "/v2/getServiceForFunction"
    4. body, err := json.Marshal(fn)
    5. if err != nil {
    6. return "", errors.Wrap(err, "could not marshal request body for getting service for function")
    7. }
    8. resp, err := ctxhttp.Post(ctx, c.httpClient, executorURL, "application/json", bytes.NewReader(body))
    9. if err != nil {
    10. return "", errors.Wrap(err, "error posting to getting service for function")
    11. }
    12. defer resp.Body.Close()
    13. if resp.StatusCode != 200 {
    14. return "", ferror.MakeErrorFromHTTP(resp)
    15. }
    16. svcName, err := ioutil.ReadAll(resp.Body)
    17. if err != nil {
    18. return "", errors.Wrap(err, "error reading response body from getting service for function")
    19. }
    20. return string(svcName), nil
    21. }
     
    

    从上图可以看出fission function test的请求被转发给router的pod。

    服务端

    核心概念

    Controller

    Accept REST API requests and create Fission resources

    接收rest api请求,创建fission资源对象

    Executor

    Component to spin up function pods

    组件用于启用准备function的pod。

    Router

    作为trigger和function之间的桥梁

    Bridge between triggers and functions

    Function Pod

    Place to load and execute the user function

    加载代码和执行用户请求

    Builder Manager

    Compile the source code into a runnable function

    对于编译性代码需要对代码进行编译才行。

    Builder Pod

    Place to load and execute the user function

    StorageSvc

    Home for source and deployment archives

    router源码分析

    router服务启动

    代码入口:

    pkg/router/router.go

    pkg/router/functionHandler.go

    调用关系

    runRouter -> Start -> server -> router

    1. func runRouter(ctx context.Context, logger *zap.Logger, port int, executorUrl string) {
    2. router.Start(ctx, logger, port, executorUrl)
    3. }
    4. // Start starts a router
    5. // pkg/router/router.go
    6. func Start(ctx context.Context, logger *zap.Logger, port int, executorURL string) {
    7. 。。。。。。
    8. // 设置一堆环境变量和client
    9. 。。。。。。
    10. // 核心代码是启动router服务
    11. serve(ctx, logger, port, triggers, displayAccessLog)
    12. }
    13. func serve(ctx context.Context, logger *zap.Logger, port int,
    14. httpTriggerSet *HTTPTriggerSet, displayAccessLog bool) {
    15. mr := router(ctx, logger, httpTriggerSet)
    16. 。。。。。。
    17. }
    18. func router(ctx context.Context, logger *zap.Logger, httpTriggerSet *HTTPTriggerSet) *mutableRouter {
    19. 。。。。。。
    20. // 核心代码,注册handler
    21. httpTriggerSet.subscribeRouter(ctx, mr)
    22. }

     注册handler

    router的handler注册逻辑比较复杂,需要进行详细分析。

    代码入口:pkg/router/functionHandler.go

    1. func (ts *HTTPTriggerSet) subscribeRouter(ctx context.Context, mr *mutableRouter) {
    2. 。。。。。。
    3. // 注册handler
    4. go ts.updateRouter()
    5. 。。。。。。
    6. }
    7. func (ts *HTTPTriggerSet) updateRouter() {
    8. for range ts.updateRouterRequestChannel {
    9. 。。。。。。
    10. // make a new router and use it
    11. // 注册handler
    12. ts.mutableRouter.updateRouter(ts.getRouter(functionTimeout))
    13. }
    14. }
    1. func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) *mux.Router {
    2. 。。。
    3. muxRouter.HandleFunc("/", defaultHomeHandler).Methods("GET")
    4. muxRouter.HandleFunc("/router-healthz", routerHealthHandler).Methods("GET")
    5. // 这里的handler对应的path是:/fission-function
    6. handler = otel.GetHandlerWithOTEL(http.HandlerFunc(fh.handler), internalRoute)
    7. 。。。
    8. }

    handler = otel.GetHandlerWithOTEL(http.HandlerFunc(fh.handler), internalRoute)

    这里的internalRoute是 "/fission-function",对应fission function test的请求:

    curl http://127.0.0.1:8888/fission-function/helloworld

    handler

    代码入口:pkg/router/functionHandler.go

    1. func (fh functionHandler) handler(responseWriter http.ResponseWriter, request *http.Request) {
    2. 。。。
    3. rrt := &RetryingRoundTripper{
    4. logger: fh.logger.Named("roundtripper"),
    5. funcHandler: &fh,
    6. funcTimeout: time.Duration(fnTimeout) * time.Second,
    7. }
    8. start := time.Now()
    9. proxy := &httputil.ReverseProxy{
    10. Director: director,
    11. Transport: rrt,
    12. ErrorHandler: fh.getProxyErrorHandler(start, rrt),
    13. ModifyResponse: func(resp *http.Response) error {
    14. go fh.collectFunctionMetric(start, rrt, request, resp)
    15. return nil
    16. },
    17. }
    18. 。。。
    19. //核心逻辑,
    20. proxy.ServeHTTP(responseWriter, request)
    21. }
    22. func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    23. 。。。
    24. // 核心逻辑,调用http的RoundTrip,对请求进行包装处理,包括增加重试,超时,缓存等能力
    25. res, err := transport.RoundTrip(outreq)
    26. }
    代码入口:pkg/router/functionHandler.go
    1. func (roundTripper *RetryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    2. 。。。
    3. for i := 0; i < roundTripper.funcHandler.tsRoundTripperParams.maxRetries; i++ {
    4. // 向excutor发起请求,查询function服务的url
    5. roundTripper.serviceURL, roundTripper.urlFromCache, err = roundTripper.funcHandler.getServiceEntry(ctx)
    6. }
    7. 。。。
    8. // 实际向function所在的pod发送请求,真正执行fission function test的地方
    9. resp, err := otelRoundTripper.RoundTrip(newReq)
    10. 。。。
    11. }
    RoundTrip函数有两个核心逻辑,一个是获取function所在的pod的url,第二个就是真正向function的pod发送请求服务。

    判断服务是否存在缓存,如果存在,从缓存中读取,如果不存在,向executor发送http请求获取。

    1. // getServiceEntryFromExecutor returns service url entry returns from executor
    2. func (fh functionHandler) getServiceEntry(ctx context.Context) (svcURL *url.URL, cacheHit bool, err error) {
    3. if fh.function.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType == fv1.ExecutorTypePoolmgr {
    4. svcURL, err = fh.getServiceEntryFromExecutor(ctx)
    5. return svcURL, false, err
    6. }
    7. // Check if service URL present in cache
    8. svcURL, err = fh.getServiceEntryFromCache()
    9. if err == nil && svcURL != nil {
    10. return svcURL, true, nil
    11. } else if err != nil {
    12. return nil, false, err
    13. }
    14. fnMeta := &fh.function.ObjectMeta
    15. recordObj, err := fh.svcAddrUpdateThrottler.RunOnce(
    16. crd.CacheKey(fnMeta),
    17. func(firstToTheLock bool) (interface{}, error) {
    18. if !firstToTheLock {
    19. svcURL, err := fh.getServiceEntryFromCache()
    20. if err != nil {
    21. return nil, err
    22. }
    23. return svcEntryRecord{svcURL: svcURL, cacheHit: true}, err
    24. }
    25. svcURL, err = fh.getServiceEntryFromExecutor(ctx)
    26. if err != nil {
    27. return nil, err
    28. }
    29. fh.addServiceEntryToCache(svcURL)
    30. return svcEntryRecord{
    31. svcURL: svcURL,
    32. cacheHit: false,
    33. }, nil
    34. },
    35. )
    36. record, ok := recordObj.(svcEntryRecord)
    37. if !ok {
    38. return nil, false, fmt.Errorf("unexpected type of recordObj %T: %w", recordObj, err)
    39. }
    40. return record.svcURL, record.cacheHit, err
    41. }
    getServiceEntryFromExecutor -> func (c *Client) GetServiceForFunction
    

    代码入口:pkg/executor/client/client.go

    1. // GetServiceForFunction returns the service name for a given function.
    2. func (c *Client) GetServiceForFunction(ctx context.Context, fn *fv1.Function) (string, error) {
    3. // 发送请求给executor
    4. executorURL := c.executorURL + "/v2/getServiceForFunction"
    5. body, err := json.Marshal(fn)
    6. if err != nil {
    7. return "", errors.Wrap(err, "could not marshal request body for getting service for function")
    8. }
    9. req, err := retryablehttp.NewRequestWithContext(ctx, "POST", executorURL, bytes.NewReader(body))
    10. if err != nil {
    11. return "", errors.Wrap(err, "could not create request for getting service for function")
    12. }
    13. req.Header.Set("Content-Type", "application/json")
    14. resp, err := c.httpClient.Do(req)
    15. if err != nil {
    16. return "", errors.Wrap(err, "error posting to getting service for function")
    17. }
    18. defer resp.Body.Close()
    19. if resp.StatusCode != 200 {
    20. return "", ferror.MakeErrorFromHTTP(resp)
    21. }
    22. svcName, err := io.ReadAll(resp.Body)
    23. if err != nil {
    24. return "", errors.Wrap(err, "error reading response body from getting service for function")
    25. }
    26. return string(svcName), nil
    27. }
    到此为止router的handler处理流程结束,从用户发送的请求转发到executor。

    executor源码分析

    上文提到router会向executor发送http请求,http请求为

    GET /v2/getServiceForFunction
    
    所以接下来分析executor服务的逻辑。同样从服务启动,handler注册,以及handler处理来分析。

    这里是executor代码入口

    代码入口:pkg/executor/executor.go

    1. // StartExecutor Starts executor and the executor components such as Poolmgr,
    2. // deploymgr and potential future executor types
    3. func StartExecutor(logger *zap.Logger, functionNamespace string, envBuilderNamespace string, port int, openTracingEnabled bool) error {
    4. ...
    5. // 启动api服务
    6. go api.Serve(port, openTracingEnabled)
    7. ...
    8. }
     
    

    executor服务启动

    代码入口:pkg/executor/api.go

    1. // Serve starts an HTTP server.
    2. func (executor *Executor) Serve(port int, openTracingEnabled bool) {
    3. executor.logger.Info("starting executor API", zap.Int("port", port))
    4. address := fmt.Sprintf(":%v", port)
    5. var handler http.Handler
    6. if openTracingEnabled {
    7. handler = &ochttp.Handler{Handler: executor.GetHandler()}
    8. } else {
    9. handler = otelUtils.GetHandlerWithOTEL(executor.GetHandler(), "fission-executor", otelUtils.UrlsToIgnore("/healthz"))
    10. }
    11. err := http.ListenAndServe(address, handler)
    12. executor.logger.Fatal("done listening", zap.Error(err))
    13. }

    handler

    代码入口:pkg/executor/api.go

    1. // GetHandler returns an http.Handler.
    2. func (executor *Executor) GetHandler() http.Handler {
    3. r := mux.NewRouter()
    4. r.HandleFunc("/v2/getServiceForFunction", executor.getServiceForFunctionAPI).Methods("POST")
    5. r.HandleFunc("/v2/tapService", executor.tapService).Methods("POST") // for backward compatibility
    6. r.HandleFunc("/v2/tapServices", executor.tapServices).Methods("POST")
    7. r.HandleFunc("/healthz", executor.healthHandler).Methods("GET")
    8. r.HandleFunc("/v2/unTapService", executor.unTapService).Methods("POST")
    9. return r
    10. }
     
    

    获取请求service

    这里我们用的executor是poolmanager,以下的代码分析都是基于poolmanager进行的。

    核心逻辑是从缓存中读取function service对应的url,如果不存在该service,则特化pod,并新建对应的service,写入缓存中。

    代码入口:pkg/executor/api.go

    1. func (executor *Executor) getServiceForFunctionAPI(w http.ResponseWriter, r *http.Request) {
    2. ......
    3. // poolmanager类型,且只能执行一次
    4. if t == fv1.ExecutorTypePoolmgr && !fn.Spec.OnceOnly {
    5. ......
    6. // 优先从cache中获取函数的service
    7. fsvc, active, err := et.GetFuncSvcFromPoolCache(ctx, fn, requestsPerpod)
    8. // check if its a cache hit (check if there is already specialized function pod that can serve another request)
    9. if err == nil {
    10. // if a pod is already serving request then it already exists else validated
    11. // 这里是命中缓存,从缓存中读取后,写入response中
    12. logger.Debug("from cache", zap.Int("active", active))
    13. if active > 1 || et.IsValid(ctx, fsvc) {
    14. // Cached, return svc address
    15. logger.Debug("served from cache", zap.String("name", fsvc.Name), zap.String("address", fsvc.Address))
    16. executor.writeResponse(w, fsvc.Address, fn.ObjectMeta.Name)
    17. return
    18. }
    19. 。。。。。。
    20. }
    21. }
    22. 。。。
    23. // 核心逻辑: 服务可以调用多次,从缓存中读取function service对应的url,如果不存在该service,则特化pod,并新建对应的service
    24. serviceName, err := executor.getServiceForFunction(ctx, fn)
    25. 。。。
    26. executor.writeResponse(w, serviceName, fn.ObjectMeta.Name)
    27. }
    代码入口:pkg/executor/executortype/poolmgr/gpm.go
    1. func (gpm *GenericPoolManager) GetFuncSvcFromPoolCache(ctx context.Context, fn *fv1.Function, requestsPerPod int) (*fscache.FuncSvc, int, error) {
    2. otelUtils.SpanTrackEvent(ctx, "GetFuncSvcFromPoolCache", otelUtils.GetAttributesForFunction(fn)...)
    3. return gpm.fsCache.GetFuncSvc(&fn.ObjectMeta, requestsPerPod)
    4. }
    代码入口:pkg/executor/fscache/functionServiceCache.go
    1. // GetFuncSvc gets a function service from pool cache using function key and returns number of active instances of function pod
    2. func (fsc *FunctionServiceCache) GetFuncSvc(m *metav1.ObjectMeta, requestsPerPod int) (*FuncSvc, int, error) {
    3. key := crd.CacheKey(m)
    4. // 读取缓存
    5. fsvcI, active, err := fsc.connFunctionCache.GetValue(key, requestsPerPod)
    6. if err != nil {
    7. fsc.logger.Info("Not found in Cache")
    8. return nil, active, err
    9. }
    10. // update atime
    11. fsvc := fsvcI.(*FuncSvc)
    12. fsvc.Atime = time.Now()
    13. fsvcCopy := *fsvc
    14. return &fsvcCopy, active, nil
    15. }
    代码入口:pkg/poolcache/poolcache.go
    1. // GetValue returns a value interface with status inActive else return error
    2. func (c *Cache) GetValue(function interface{}, requestsPerPod int) (interface{}, int, error) {
    3. respChannel := make(chan *response)
    4. c.requestChannel <- &request{
    5. requestType: getValue,
    6. function: function,
    7. requestsPerPod: requestsPerPod,
    8. responseChannel: respChannel,
    9. }
    10. resp := <-respChannel
    11. return resp.value, resp.totalActive, resp.error
    12. }
    这里GetValue是通过管道发消息来处理的,之后通过respChannel管道获取结果。

    从管道获取service

    这部分是poolcache的内容,poolcache服务在execotor启动是启动的。在代码NewPoolCache中,详细的后续待进一步分析,这里仅分析和请求相关的内容。

    代码入口:pkg/poolcache/poolcache.go

    1. func (c *Cache) service() {
    2. for {
    3. req := <-c.requestChannel
    4. resp := &response{}
    5. switch req.requestType {
    6. case getValue:
    7. values, ok := c.cache[req.function]
    8. found := false
    9. if !ok {
    10. resp.error = ferror.MakeError(ferror.ErrorNotFound,
    11. fmt.Sprintf("function Name '%v' not found", req.function))
    12. } else {
    13. for addr := range values {
    14. if values[addr].activeRequests < req.requestsPerPod && values[addr].currentCPUUsage.Cmp(values[addr].cpuLimit) < 1 {
    15. // mark active
    16. values[addr].activeRequests++
    17. resp.value = values[addr].val
    18. found = true
    19. break
    20. }
    21. }
    22. if !found {
    23. resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%v' all functions are busy", req.function))
    24. }
    25. resp.totalActive = len(values)
    26. }
    27. // 结果通过管道返回
    28. req.responseChannel <- resp
    29. case setValue:
    30. if _, ok := c.cache[req.function]; !ok {
    31. c.cache[req.function] = make(map[interface{}]*value)
    32. }
    33. if _, ok := c.cache[req.function][req.address]; !ok {
    34. c.cache[req.function][req.address] = &value{}
    35. }
    36. c.cache[req.function][req.address].val = req.value
    37. c.cache[req.function][req.address].activeRequests++
    38. c.cache[req.function][req.address].cpuLimit = req.cpuUsage
    39. case listAvailableValue:
    40. 。。。
    41. case setCPUUtilization:
    42. 。。。
    43. case markAvailable:
    44. 。。。
    45. case deleteValue:
    46. 。。。
    47. default:
    48. 。。。
    49. }
    50. }
    51. }
    写入service cache
    1. // SetValue marks the value at key [function][address] as active(begin used)
    2. func (c *Cache) SetValue(function, address, value interface{}, cpuLimit resource.Quantity) {
    3. respChannel := make(chan *response)
    4. c.requestChannel <- &request{
    5. requestType: setValue,
    6. function: function,
    7. address: address,
    8. value: value,
    9. cpuUsage: cpuLimit,
    10. responseChannel: respChannel,
    11. }
    12. }
    查看函数调用关系,可以知道写入入口是createServiceForFunction

    代码入口:pkg/executor/executor.go

    1. func (executor *Executor) createServiceForFunction(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
    2. ......
    3. t := fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType
    4. e, ok := executor.executorTypes[t]
    5. if !ok {
    6. return nil, errors.Errorf("Unknown executor type '%v'", t)
    7. }
    8. fsvc, fsvcErr := e.GetFuncSvc(ctx, fn)
    9. ......
    10. return fsvc, fsvcErr
    11. }
     
    

    代码入口:pkg/executor/executortype/poolmgr/gpm.go

    1. func (gpm *GenericPoolManager) GetFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
    2. otelUtils.SpanTrackEvent(ctx, "GetFuncSvc", otelUtils.GetAttributesForFunction(fn)...)
    3. logger := otelUtils.LoggerWithTraceID(ctx, gpm.logger)
    4. // from Func -> get Env
    5. logger.Debug("getting environment for function", zap.String("function", fn.ObjectMeta.Name))
    6. env, err := gpm.getFunctionEnv(ctx, fn)
    7. if err != nil {
    8. return nil, err
    9. }
    10. pool, created, err := gpm.getPool(ctx, env)
    11. if err != nil {
    12. return nil, err
    13. }
    14. if created {
    15. logger.Info("created pool for the environment", zap.String("env", env.ObjectMeta.Name), zap.String("namespace", gpm.namespace))
    16. }
    17. // from GenericPool -> get one function container
    18. // (this also adds to the cache)
    19. logger.Debug("getting function service from pool", zap.String("function", fn.ObjectMeta.Name))
    20. return pool.getFuncSvc(ctx, fn)
    21. }
    代码入口:pkg/executor/executortype/poolmgr/gp.go
    1. func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
    2. // 配置pod函数相关的label
    3. logger.Info("choosing pod from pool")
    4. funcLabels := gp.labelsForFunction(&fn.ObjectMeta)
    5. ...
    6. // 选择一个pod
    7. key, pod, err := gp.choosePod(ctx, funcLabels)
    8. if err != nil {
    9. return nil, err
    10. }
    11. gp.readyPodQueue.Done(key)
    12. // pod进行特化处理
    13. err = gp.specializePod(ctx, pod, fn)
    14. if err != nil {
    15. gp.scheduleDeletePod(pod.ObjectMeta.Name)
    16. return nil, err
    17. }
    18. logger.Info("specialized pod", zap.String("pod", pod.ObjectMeta.Name), zap.String("podNamespace", pod.ObjectMeta.Namespace), zap.String("podIP", pod.Status.PodIP))
    19. var svcHost string
    20. if gp.useSvc && !gp.useIstio {
    21. svcName := fmt.Sprintf("svc-%v", fn.ObjectMeta.Name)
    22. if len(fn.ObjectMeta.UID) > 0 {
    23. svcName = fmt.Sprintf("%s-%v", svcName, fn.ObjectMeta.UID)
    24. }
    25. svc, err := gp.createSvc(ctx, svcName, funcLabels)
    26. if err != nil {
    27. gp.scheduleDeletePod(pod.ObjectMeta.Name)
    28. return nil, err
    29. }
    30. if svc.ObjectMeta.Name != svcName {
    31. gp.scheduleDeletePod(pod.ObjectMeta.Name)
    32. return nil, errors.Errorf("sanity check failed for svc %v", svc.ObjectMeta.Name)
    33. }
    34. // the fission router isn't in the same namespace, so return a
    35. // namespace-qualified hostname
    36. svcHost = fmt.Sprintf("%v.%v:8888", svcName, gp.namespace)
    37. } else if gp.useIstio {
    38. svc := utils.GetFunctionIstioServiceName(fn.ObjectMeta.Name, fn.ObjectMeta.Namespace)
    39. svcHost = fmt.Sprintf("%v.%v:8888", svc, gp.namespace)
    40. } else {
    41. svcHost = fmt.Sprintf("%v:8888", pod.Status.PodIP)
    42. }
    43. otelUtils.SpanTrackEvent(ctx, "addFunctionLabel", otelUtils.GetAttributesForPod(pod)...)
    44. // patch svc-host and resource version to the pod annotations for new executor to adopt the pod
    45. patch := fmt.Sprintf(`{"metadata":{"annotations":{"%v":"%v","%v":"%v"}}}`,
    46. fv1.ANNOTATION_SVC_HOST, svcHost, fv1.FUNCTION_RESOURCE_VERSION, fn.ObjectMeta.ResourceVersion)
    47. p, err := gp.kubernetesClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, k8sTypes.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
    48. if err != nil {
    49. // just log the error since it won't affect the function serving
    50. logger.Warn("error patching svc-host to pod", zap.Error(err),
    51. zap.String("pod", pod.Name), zap.String("ns", pod.Namespace))
    52. } else {
    53. pod = p
    54. }
    55. kubeObjRefs := []apiv1.ObjectReference{
    56. {
    57. Kind: "pod",
    58. Name: pod.ObjectMeta.Name,
    59. APIVersion: pod.TypeMeta.APIVersion,
    60. Namespace: pod.ObjectMeta.Namespace,
    61. ResourceVersion: pod.ObjectMeta.ResourceVersion,
    62. UID: pod.ObjectMeta.UID,
    63. },
    64. }
    65. cpuUsage := resource.MustParse("0m")
    66. for _, container := range pod.Spec.Containers {
    67. val := *container.Resources.Limits.Cpu()
    68. cpuUsage.Add(val)
    69. }
    70. // set cpuLimit to 85th percentage of the cpuUsage
    71. cpuLimit, err := gp.getPercent(cpuUsage, 0.85)
    72. if err != nil {
    73. logger.Error("failed to get 85 of CPU usage", zap.Error(err))
    74. cpuLimit = cpuUsage
    75. }
    76. logger.Debug("cpuLimit set to", zap.Any("cpulimit", cpuLimit))
    77. m := fn.ObjectMeta // only cache necessary part
    78. fsvc := &fscache.FuncSvc{
    79. Name: pod.ObjectMeta.Name,
    80. Function: &m,
    81. Environment: gp.env,
    82. Address: svcHost,
    83. KubernetesObjects: kubeObjRefs,
    84. Executor: fv1.ExecutorTypePoolmgr,
    85. CPULimit: cpuLimit,
    86. Ctime: time.Now(),
    87. Atime: time.Now(),
    88. }
    89. gp.fsCache.PodToFsvc.Store(pod.GetObjectMeta().GetName(), fsvc)
    90. gp.podFSVCMap.Store(pod.ObjectMeta.Name, []interface{}{crd.CacheKey(fsvc.Function), fsvc.Address})
    91. gp.fsCache.AddFunc(*fsvc)
    92. gp.fsCache.IncreaseColdStarts(fn.ObjectMeta.Name, string(fn.ObjectMeta.UID))
    93. logger.Info("added function service",
    94. zap.String("pod", pod.ObjectMeta.Name),
    95. zap.String("podNamespace", pod.ObjectMeta.Namespace),
    96. zap.String("serviceHost", svcHost),
    97. zap.String("podIP", pod.Status.PodIP))
    98. otelUtils.SpanTrackEvent(ctx, "getFuncSvcComplete", otelUtils.GetAttributesForFuncSvc(fsvc)...)
    99. return fsvc, nil
     
    

    getServiceForFunction

    // 核心逻辑: 服务可以调用多次,从缓存中读取function service对应的url,如果不存在该service,则特化pod,并新建对应的service

    1. // 代码入口: pkg/executor/api.go
    2. func (executor *Executor) getServiceForFunction(ctx context.Context, fn *fv1.Function) (string, error) {
    3. respChan := make(chan *createFuncServiceResponse)
    4. // 通过管道发送创建function service服务请求消息
    5. executor.requestChan <- &createFuncServiceRequest{
    6. context: ctx,
    7. function: fn,
    8. respChan: respChan,
    9. }
    10. resp := <-respChan
    11. if resp.err != nil {
    12. return "", resp.err
    13. }
    14. return resp.funcSvc.Address, resp.err
    15. }
    16. // 代码入口:pkg/executor/executor.go
    17. func (executor *Executor) serveCreateFuncServices() {
    18. // 接收上面的管道消息
    19. for {
    20. req := <-executor.requestChan
    21. 。。。
    22. // 核心处理逻辑
    23. fsvc, err := executor.createServiceForFunction(fnSpecializationTimeoutContext, req.function)
    24. 。。。
    25. // 返回response消息
    26. req.respChan <- &createFuncServiceResponse{
    27. funcSvc: fsvc,
    28. err: err,
    29. }
    30. }
    31. }

    createServiceForFunction -> GenericPoolManager.GetFuncSvc -> GenericPoolManager.GetFuncSvc

    1. Router 向 executor 发送请求获取函数访问入口,参考 GetServiceForFunction
    2. Poolmgr 从函数指定环境对应的通用 pod 池里随机选择一个 pod 作为函数执行的载体,这里通过更改 pod 的标签让其从 deployment 中“独立”出来,参考 _choosePod。K8s 发现 deployment 所管理 pod 的实际副本数少于目标副本数后会对 pod 进行补充,这样便实现了保持通用 pod 池中的 pod 个数的目的。
    3. 特化处理被挑选出来的 pod,参考 specializePod

    代码入口:pkg/executor/executortype/poolmgr/gp.go

    1. func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
    2. 。。。
    3. // 核心逻辑:选择一个pod,修改pod label
    4. key, pod, err := gp.choosePod(ctx, funcLabels)
    5. // 向pod发送特化请求
    6. err = gp.specializePod(ctx, pod, fn)
    7. }
     
    

    specializePod

    向fetcher容器发送请求 POST /specialize,fetcher回去加载代码,具体逻辑后续会继续分析fetcher模块。

    1. // specializePod chooses a pod, copies the required user-defined function to that pod
    2. // (via fetcher), and calls the function-run container to load it, resulting in a
    3. // specialized pod.
    4. func (gp *GenericPool) specializePod(ctx context.Context, pod *apiv1.Pod, fn *fv1.Function) error {
    5. // tell fetcher to get the function.
    6. fetcherURL := gp.getFetcherURL(podIP)
    7. logger.Info("calling fetcher to copy function", zap.String("function", fn.ObjectMeta.Name), zap.String("url", fetcherURL))
    8. specializeReq := gp.fetcherConfig.NewSpecializeRequest(fn, gp.env)
    9. // Fetcher will download user function to share volume of pod, and
    10. // invoke environment specialize api for pod specialization.
    11. err := fetcherClient.MakeClient(gp.logger, fetcherURL).Specialize(ctx, &specializeReq)
    12. if err != nil {
    13. return err
    14. }
    15. otelUtils.SpanTrackEvent(ctx, "specializedPod", otelUtils.GetAttributesForPod(pod)...)
    16. return nil
    17. }
     
    

    k8s代理转发

    kubectl port-forward

    获取pod列表调用关系

    1. func runPortForward(ctx context.Context, labelSelector string, localPort string, ns string, kubeContext string) (chan struct{}, chan struct{}, error) {
    2. 。。。。。。
    3. // create request URL
    4. req := clientset.CoreV1().RESTClient().Post().Resource("pods").
    5. Namespace(podNameSpace).Name(podName).SubResource("portforward")
    6. url := req.URL()
    7. 。。。。。。
    8. }

    参考

    深入理解POD的工作原理(中) - 简书

    源码解析 kubectl port-forward 工作原理

    Golang Http RoundTrip解析_zhanglehes的博客-CSDN博客

  • 相关阅读:
    专利申请的流程及好处
    PHP自动识别采集何意网址文章正文内容
    C++模板
    day58| 739. 每日温度、496.下一个更大元素 I
    Redis主从复制部署小结
    如何实现通达信接口开发?
    小米手机怎么识别图片上的表格文字?
    名词解析与经验分享(前端)
    JavaWeb、其他技术
    Oracle中LEFT JOIN后AND与WHERE的异同
  • 原文地址:https://blog.csdn.net/cbmljs/article/details/127803286