fission命令行的本质是封装http请求发送给router。但是本地的网络和k8s中router的网络是隔离的,所以需要一个代理,我们知道k8s中可以利用kubectl port-forward来实现端口转发,从而解决网络隔离问题,fission-cli代码也实现了端口转发的能力。
本文通过fission请求function服务来分析下fission的原理。
代码入口:cmd/fission-cli/main.go
版本:v1.17.0
- $ fission version
- client:
- fission/core:
- BuildDate: "2022-09-16T13:24:57Z"
- GitCommit: b36e0516
- Version: v1.17.0
- server:
- fission/core:
- BuildDate: "2022-09-16T13:24:57Z"
- GitCommit: b36e0516
- Version: v1.17.0
-
-
- #
- $ fission check
- fission-services
- --------------------
- √ controller is running fine
- √ executor is running fine
- √ router is running fine
- √ storagesvc is running fine
-
- fission-version
- --------------------
- √ fission is up-to-date
这里通过调用fission function命令行来讲。
fission function test --name helloworld
- func main() {
- // 命令行封装
- cmd := app.App()
- cmd.SilenceErrors = true // use our own error message printer
-
- // 实际执行
- err := cmd.Execute()
- if err != nil {
- // let program exit with non-zero code when error occurs
- console.Error(err.Error())
- os.Exit(1)
- }
- }
- func App() *cobra.Command {
- ...
-
- groups := helptemplate.CommandGroups{}
- groups = append(groups, helptemplate.CreateCmdGroup("Auth Commands(Note: Authentication should be enabled to use a command in this group.)", token.Commands()))
- groups = append(groups, helptemplate.CreateCmdGroup("Basic Commands", environment.Commands(), _package.Commands(), function.Commands(), archive.Commands()))
- groups = append(groups, helptemplate.CreateCmdGroup("Trigger Commands", httptrigger.Commands(), mqtrigger.Commands(), timetrigger.Commands(), kubewatch.Commands()))
- groups = append(groups, helptemplate.CreateCmdGroup("Deploy Strategies Commands", canaryconfig.Commands()))
- groups = append(groups, helptemplate.CreateCmdGroup("Declarative Application Commands", spec.Commands()))
- groups = append(groups, helptemplate.CreateCmdGroup("Other Commands", support.Commands(), version.Commands(), check.Commands()))
- ...
- }
其中function.Commands()就是fission function子命令的入口
代码入口:pkg/fission-cli/cmd/function/command.go
- func Commands() *cobra.Command {
- ...
-
- testCmd := &cobra.Command{
- Use: "test",
- Aliases: []string{},
- Short: "Test a function",
- RunE: wrapper.Wrapper(Test),
- }
- ...
- }
核心代码是wrapper.Wrapper(Test),这个是cmd.Execute()实际调用的入口。
代码入口:pkg/fission-cli/cmd/function/test.go
- func Test(input cli.Input) error {
- return (&TestSubCommand{}).do(input)
- }
-
-
- func (opts *TestSubCommand) do(input cli.Input) error {
- 。。。。。。
- // 端口映射,将localport映射到fission router的service port
- localRouterPort, err := util.SetupPortForward(util.GetFissionNamespace(), "application=fission-router", kubeContext)
- if err != nil {
- return err
- }
- // http://127.0.0.1:49703/fission-function/helloworld
- fnURL := "http://127.0.0.1:" + localRouterPort + util.UrlForFunction(m.Name, m.Namespace)
-
- functionUrl, err := url.Parse(fnURL)
-
- // 发送http请求
- resp, err := doHTTPRequest(ctx, functionUrl.String(),
- input.StringSlice(flagkey.FnTestHeader),
- method,
- input.String(flagkey.FnTestBody))
- if err != nil {
- return err
- }
- defer resp.Body.Close()
-
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return errors.Wrap(err, "error reading response from function")
- }
-
- if resp.StatusCode < 400 {
- // 输出到标准输出
- os.Stdout.Write(body)
- return nil
- }
-
- }
代码入口:pkg/fission-cli/cmd/function/test.go
- // url: http://127.0.0.1:63177/fission-function/helloworld
- func doHTTPRequest(ctx context.Context, url string, headers []string, method, body string) (*http.Response, error) {
-
- ......
- // 新建http请求
- req, err := http.NewRequest(method, url, strings.NewReader(body))
- ......
- // 构建httpclient
- hc := &http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
- // http请求localhost:port会通过port-forward转发到k8s apiserver,从而请求到对应pod
- resp, err := hc.Do(req.WithContext(ctx))
- 。。。。。。
- }
-
-
- // 请求url:url: http://127.0.0.1:63177/fission-function/helloworld
- func (c *Client) Do(req *Request) (*Response, error) {
- return c.do(req)
- }
-
-
- func (c *Client) do(req *Request) (retres *Response, reterr error) {
-
- 。。。。。。
- for {
-
- if resp, didTimeout, err = c.send(req, deadline); err != nil {
- // c.send() always closes req.Body
- reqBodyClosed = true
- if !deadline.IsZero() && didTimeout() {
- err = &httpError{
- err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
- timeout: true,
- }
- }
- return nil, uerr(err)
- }
- }
-
- }
-
-
- // didTimeout is non-nil only if err != nil.
- func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
- if c.Jar != nil {
- for _, cookie := range c.Jar.Cookies(req.URL) {
- req.AddCookie(cookie)
- }
- }
- resp, didTimeout, err = send(req, c.transport(), deadline)
- if err != nil {
- return nil, didTimeout, err
- }
- if c.Jar != nil {
- if rc := resp.Cookies(); len(rc) > 0 {
- c.Jar.SetCookies(req.URL, rc)
- }
- }
- return resp, nil, nil
- }
-
-
-
- // send issues an HTTP request.
- // Caller should close resp.Body when done reading from it.
- func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
- // GET http://127.0.0.1:63177/fission-function/helloworld
- req := ireq // req is either the original request, or a modified fork
-
-
-
- }

到这里,我们可以看到fission命令行实际是是一个http请求,http://127.0.0.1:49703/fission-function/helloworld,那么对端口49703的访问,如何发送给fission服务端呢?这里就需要了解port-forward的端口映射原理。
为什么需要端口映射?这是因为宿主机的网路和k8s pod的网络是隔离的,所以宿主机的http请求无法直接请求到pod上,这就需要将宿主机的请求通过代理转发到pod上。这里用的是端口号映射机制,其原理和kubectl port-forward是一样的。如下图我们可以通过调整日志级别,查看fission test命令的详细日志。

- // Port forward a free local port to a pod on the cluster. The pod is
- // found in the specified namespace by labelSelector. The pod's port
- // is found by looking for a service in the same namespace and using
- // its targetPort. Once the port forward is started, wait for it to
- // start accepting connections before returning.
- func SetupPortForward(namespace, labelSelector string, kubeContext string) (string, error) {
- 。。。。。。
- // 本地端口号
- localPort, err := findFreePort()
- 。。。。。。
-
- // runPortForward启用一个local端口号,请求指向特定label的pod
- readyC, _, err := runPortForward(context.Background(), labelSelector, localPort, namespace, kubeContext)
- if err != nil {
- fmt.Printf("Error forwarding to port %v: %s", localPort, err.Error())
- return "", err
- }
-
- <-readyC
-
- 。。。。。。
- }
-
-
-
- // runPortForward creates a local port forward to the specified pod
- func runPortForward(ctx context.Context, labelSelector string, localPort string, ns string, kubeContext string) (chan struct{}, chan struct{}, error) {
- 。。。。。。
- // 通过label获取pod列表
- podList, err := clientset.CoreV1().Pods(ns).
- List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
-
- 。。。。。。
- // 通过label获取service,以及对应的port
- svcs, err := clientset.CoreV1().Services(podNameSpace).
- List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
-
- // 默认映射到router service的端口号8888
- var targetPort string
- for _, servicePort := range service.Spec.Ports {
- targetPort = servicePort.TargetPort.String()
- }
-
- 。。。。。。
-
- // 核心转换,从podname获取选定pod的k8s url,
- // Host:kubernetes.docker.internal:6443
- // Path:/api/v1/namespaces/fission/pods/controller-657d4cc757-lqlzf/portforward
- req := clientset.CoreV1().RESTClient().Post().Resource("pods").
- Namespace(podNameSpace).Name(podName).SubResource("portforward")
- url := req.URL()
-
- // create ports slice
- portCombo := localPort + ":" + targetPort
-
- // actually start the port-forwarding process here
- transport, upgrader, err := spdy.RoundTripperFor(config)
- dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
-
- // New creates a new PortForwarder with localhost listen addresses.
- fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, outStream, os.Stderr)
-
- // ForwardPorts监听hhtp端口映射的请求,请求是一直连接,直到收到stopChan信号
- go func() {
- 。。。。。。
- err := fw.ForwardPorts()
-
- }()
-
- 。。。
- }
-
代码入口:pkg/router/functionHandler.go
RoundTrip可以看做是http的中间件。RoundTrip是对http请求进行重试的自定义传输,它将请求转发到获得的正确serviceUrl
从路由器的缓存,或从执行器,如果路由器中存在缓存的话。
RoundTrip这里对http请求增加了缓存和重试能力。
- func (roundTripper *RetryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
- 。。。。。。
-
- for i := 0; i < roundTripper.funcHandler.tsRoundTripperParams.maxRetries; i++ {
- // set service url of target service of request only when
- // trying to get new service url from cache/executor.
- // 调用请求:POST http://executor.fission/v2/getServiceForFunction
- roundTripper.serviceURL, roundTripper.urlFromCache, err = roundTripper.funcHandler.getServiceEntry(ctx)
-
- 。。。。。。
- // 调用请求:POST http://executor.fission/v2/unTapService
- go roundTripper.funcHandler.unTapService(fn, serviceURL) //nolint errcheck
-
- 。。。
- // fission发送的请求,最终发送到指定function pod上执行,fission请求获取返回结果
- resp, err := otelRoundTripper.RoundTrip(newReq)
- }
- 。。。。。。
- }
代码入口:pkg/router/functionHandler.go
-
- // getServiceEntryFromExecutor returns service url entry returns from executor
- func (fh functionHandler) getServiceEntry(ctx context.Context) (svcURL *url.URL, cacheHit bool, err error) {
- if fh.function.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType == fv1.ExecutorTypePoolmgr {
- svcURL, err = fh.getServiceEntryFromExecutor(ctx)
- return svcURL, false, err
- }
- // Check if service URL present in cache
- svcURL, err = fh.getServiceEntryFromCache()
- if err == nil && svcURL != nil {
- return svcURL, true, nil
- } else if err != nil {
- return nil, false, err
- }
-
- fnMeta := &fh.function.ObjectMeta
- recordObj, err := fh.svcAddrUpdateThrottler.RunOnce(
- crd.CacheKey(fnMeta),
- func(firstToTheLock bool) (interface{}, error) {
- if !firstToTheLock {
- svcURL, err := fh.getServiceEntryFromCache()
- if err != nil {
- return nil, err
- }
- return svcEntryRecord{svcURL: svcURL, cacheHit: true}, err
- }
- svcURL, err = fh.getServiceEntryFromExecutor(ctx)
- if err != nil {
- return nil, err
- }
- fh.addServiceEntryToCache(svcURL)
- return svcEntryRecord{
- svcURL: svcURL,
- cacheHit: false,
- }, nil
- },
- )
-
- record, ok := recordObj.(svcEntryRecord)
- if !ok {
- return nil, false, fmt.Errorf("received unknown service record type")
- }
- return record.svcURL, record.cacheHit, err
- }
代码入口:pkg/router/functionHandler.go
-
- func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serviceUrl *url.URL, err error) {
- ......
-
- service, err := fh.executor.GetServiceForFunction(fContext, fh.function)
- if err != nil {
- statusCode, errMsg := ferror.GetHTTPError(err)
- logger.Error("error from GetServiceForFunction",
- zap.Error(err),
- zap.String("error_message", errMsg),
- zap.Any("function", fh.function),
- zap.Int("status_code", statusCode))
- return nil, err
- }
- // parse the address into url
- svcURL, err := url.Parse(fmt.Sprintf("http://%v", service))
- if err != nil {
- logger.Error("error parsing service url",
- zap.Error(err),
- zap.String("service_url", svcURL.String()))
- return nil, err
- }
- return svcURL, err
- }
代码入口:pkg/executor/client/client.go
-
- // GetServiceForFunction returns the service name for a given function.
- func (c *Client) GetServiceForFunction(ctx context.Context, fn *fv1.Function) (string, error) {
- executorURL := c.executorURL + "/v2/getServiceForFunction"
-
- body, err := json.Marshal(fn)
- if err != nil {
- return "", errors.Wrap(err, "could not marshal request body for getting service for function")
- }
-
- resp, err := ctxhttp.Post(ctx, c.httpClient, executorURL, "application/json", bytes.NewReader(body))
- if err != nil {
- return "", errors.Wrap(err, "error posting to getting service for function")
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != 200 {
- return "", ferror.MakeErrorFromHTTP(resp)
- }
-
- svcName, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return "", errors.Wrap(err, "error reading response body from getting service for function")
- }
-
- return string(svcName), nil
- }


从上图可以看出fission function test的请求被转发给router的pod。
Accept REST API requests and create Fission resources
接收rest api请求,创建fission资源对象

Component to spin up function pods
组件用于启用准备function的pod。

作为trigger和function之间的桥梁
Bridge between triggers and functions
Place to load and execute the user function
加载代码和执行用户请求
Compile the source code into a runnable function
对于编译性代码需要对代码进行编译才行。
Place to load and execute the user function
Home for source and deployment archives
代码入口:
pkg/router/router.go
pkg/router/functionHandler.go
调用关系
runRouter -> Start -> server -> router
-
- func runRouter(ctx context.Context, logger *zap.Logger, port int, executorUrl string) {
- router.Start(ctx, logger, port, executorUrl)
- }
-
-
-
- // Start starts a router
- // pkg/router/router.go
- func Start(ctx context.Context, logger *zap.Logger, port int, executorURL string) {
- 。。。。。。
- // 设置一堆环境变量和client
- 。。。。。。
- // 核心代码是启动router服务
- serve(ctx, logger, port, triggers, displayAccessLog)
- }
-
-
- func serve(ctx context.Context, logger *zap.Logger, port int,
- httpTriggerSet *HTTPTriggerSet, displayAccessLog bool) {
- mr := router(ctx, logger, httpTriggerSet)
- 。。。。。。
- }
-
-
- func router(ctx context.Context, logger *zap.Logger, httpTriggerSet *HTTPTriggerSet) *mutableRouter {
- 。。。。。。
- // 核心代码,注册handler
- httpTriggerSet.subscribeRouter(ctx, mr)
- }
router的handler注册逻辑比较复杂,需要进行详细分析。
代码入口:pkg/router/functionHandler.go

- func (ts *HTTPTriggerSet) subscribeRouter(ctx context.Context, mr *mutableRouter) {
- 。。。。。。
- // 注册handler
- go ts.updateRouter()
- 。。。。。。
- }
-
-
- func (ts *HTTPTriggerSet) updateRouter() {
- for range ts.updateRouterRequestChannel {
- 。。。。。。
-
- // make a new router and use it
- // 注册handler
- ts.mutableRouter.updateRouter(ts.getRouter(functionTimeout))
- }
- }
- func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) *mux.Router {
-
- 。。。
- muxRouter.HandleFunc("/", defaultHomeHandler).Methods("GET")
-
- muxRouter.HandleFunc("/router-healthz", routerHealthHandler).Methods("GET")
-
- // 这里的handler对应的path是:/fission-function
- handler = otel.GetHandlerWithOTEL(http.HandlerFunc(fh.handler), internalRoute)
- 。。。
-
- }
handler = otel.GetHandlerWithOTEL(http.HandlerFunc(fh.handler), internalRoute)
这里的internalRoute是 "/fission-function",对应fission function test的请求:
curl http://127.0.0.1:8888/fission-function/helloworld


代码入口:pkg/router/functionHandler.go
- func (fh functionHandler) handler(responseWriter http.ResponseWriter, request *http.Request) {
- 。。。
-
- rrt := &RetryingRoundTripper{
- logger: fh.logger.Named("roundtripper"),
- funcHandler: &fh,
- funcTimeout: time.Duration(fnTimeout) * time.Second,
- }
-
- start := time.Now()
-
- proxy := &httputil.ReverseProxy{
- Director: director,
- Transport: rrt,
- ErrorHandler: fh.getProxyErrorHandler(start, rrt),
- ModifyResponse: func(resp *http.Response) error {
- go fh.collectFunctionMetric(start, rrt, request, resp)
- return nil
- },
- }
- 。。。
- //核心逻辑,
-
- proxy.ServeHTTP(responseWriter, request)
- }
-
-
- func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
- 。。。
- // 核心逻辑,调用http的RoundTrip,对请求进行包装处理,包括增加重试,超时,缓存等能力
- res, err := transport.RoundTrip(outreq)
-
-
- }
代码入口:pkg/router/functionHandler.go
- func (roundTripper *RetryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
-
- 。。。
- for i := 0; i < roundTripper.funcHandler.tsRoundTripperParams.maxRetries; i++ {
- // 向excutor发起请求,查询function服务的url
- roundTripper.serviceURL, roundTripper.urlFromCache, err = roundTripper.funcHandler.getServiceEntry(ctx)
-
- }
-
- 。。。
- // 实际向function所在的pod发送请求,真正执行fission function test的地方
- resp, err := otelRoundTripper.RoundTrip(newReq)
-
- 。。。
- }
RoundTrip函数有两个核心逻辑,一个是获取function所在的pod的url,第二个就是真正向function的pod发送请求服务。
判断服务是否存在缓存,如果存在,从缓存中读取,如果不存在,向executor发送http请求获取。
-
- // getServiceEntryFromExecutor returns service url entry returns from executor
- func (fh functionHandler) getServiceEntry(ctx context.Context) (svcURL *url.URL, cacheHit bool, err error) {
- if fh.function.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType == fv1.ExecutorTypePoolmgr {
- svcURL, err = fh.getServiceEntryFromExecutor(ctx)
- return svcURL, false, err
- }
- // Check if service URL present in cache
- svcURL, err = fh.getServiceEntryFromCache()
- if err == nil && svcURL != nil {
- return svcURL, true, nil
- } else if err != nil {
- return nil, false, err
- }
-
- fnMeta := &fh.function.ObjectMeta
- recordObj, err := fh.svcAddrUpdateThrottler.RunOnce(
- crd.CacheKey(fnMeta),
- func(firstToTheLock bool) (interface{}, error) {
- if !firstToTheLock {
- svcURL, err := fh.getServiceEntryFromCache()
- if err != nil {
- return nil, err
- }
- return svcEntryRecord{svcURL: svcURL, cacheHit: true}, err
- }
- svcURL, err = fh.getServiceEntryFromExecutor(ctx)
- if err != nil {
- return nil, err
- }
- fh.addServiceEntryToCache(svcURL)
- return svcEntryRecord{
- svcURL: svcURL,
- cacheHit: false,
- }, nil
- },
- )
-
- record, ok := recordObj.(svcEntryRecord)
- if !ok {
- return nil, false, fmt.Errorf("unexpected type of recordObj %T: %w", recordObj, err)
- }
- return record.svcURL, record.cacheHit, err
- }
getServiceEntryFromExecutor -> func (c *Client) GetServiceForFunction
代码入口:pkg/executor/client/client.go
-
- // GetServiceForFunction returns the service name for a given function.
- func (c *Client) GetServiceForFunction(ctx context.Context, fn *fv1.Function) (string, error) {
- // 发送请求给executor
- executorURL := c.executorURL + "/v2/getServiceForFunction"
-
- body, err := json.Marshal(fn)
- if err != nil {
- return "", errors.Wrap(err, "could not marshal request body for getting service for function")
- }
-
- req, err := retryablehttp.NewRequestWithContext(ctx, "POST", executorURL, bytes.NewReader(body))
- if err != nil {
- return "", errors.Wrap(err, "could not create request for getting service for function")
- }
- req.Header.Set("Content-Type", "application/json")
-
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return "", errors.Wrap(err, "error posting to getting service for function")
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != 200 {
- return "", ferror.MakeErrorFromHTTP(resp)
- }
-
- svcName, err := io.ReadAll(resp.Body)
- if err != nil {
- return "", errors.Wrap(err, "error reading response body from getting service for function")
- }
-
- return string(svcName), nil
- }
到此为止router的handler处理流程结束,从用户发送的请求转发到executor。
上文提到router会向executor发送http请求,http请求为
GET /v2/getServiceForFunction
所以接下来分析executor服务的逻辑。同样从服务启动,handler注册,以及handler处理来分析。
这里是executor代码入口
代码入口:pkg/executor/executor.go
- // StartExecutor Starts executor and the executor components such as Poolmgr,
- // deploymgr and potential future executor types
- func StartExecutor(logger *zap.Logger, functionNamespace string, envBuilderNamespace string, port int, openTracingEnabled bool) error {
-
- ...
- // 启动api服务
- go api.Serve(port, openTracingEnabled)
- ...
- }
代码入口:pkg/executor/api.go
-
- // Serve starts an HTTP server.
- func (executor *Executor) Serve(port int, openTracingEnabled bool) {
- executor.logger.Info("starting executor API", zap.Int("port", port))
- address := fmt.Sprintf(":%v", port)
-
- var handler http.Handler
- if openTracingEnabled {
- handler = &ochttp.Handler{Handler: executor.GetHandler()}
- } else {
- handler = otelUtils.GetHandlerWithOTEL(executor.GetHandler(), "fission-executor", otelUtils.UrlsToIgnore("/healthz"))
- }
-
- err := http.ListenAndServe(address, handler)
- executor.logger.Fatal("done listening", zap.Error(err))
- }
代码入口:pkg/executor/api.go
- // GetHandler returns an http.Handler.
- func (executor *Executor) GetHandler() http.Handler {
- r := mux.NewRouter()
- r.HandleFunc("/v2/getServiceForFunction", executor.getServiceForFunctionAPI).Methods("POST")
- r.HandleFunc("/v2/tapService", executor.tapService).Methods("POST") // for backward compatibility
- r.HandleFunc("/v2/tapServices", executor.tapServices).Methods("POST")
- r.HandleFunc("/healthz", executor.healthHandler).Methods("GET")
- r.HandleFunc("/v2/unTapService", executor.unTapService).Methods("POST")
- return r
- }
这里我们用的executor是poolmanager,以下的代码分析都是基于poolmanager进行的。
核心逻辑是从缓存中读取function service对应的url,如果不存在该service,则特化pod,并新建对应的service,写入缓存中。
代码入口:pkg/executor/api.go
- func (executor *Executor) getServiceForFunctionAPI(w http.ResponseWriter, r *http.Request) {
- ......
- // poolmanager类型,且只能执行一次
- if t == fv1.ExecutorTypePoolmgr && !fn.Spec.OnceOnly {
- ......
- // 优先从cache中获取函数的service
- fsvc, active, err := et.GetFuncSvcFromPoolCache(ctx, fn, requestsPerpod)
- // check if its a cache hit (check if there is already specialized function pod that can serve another request)
- if err == nil {
- // if a pod is already serving request then it already exists else validated
- // 这里是命中缓存,从缓存中读取后,写入response中
- logger.Debug("from cache", zap.Int("active", active))
- if active > 1 || et.IsValid(ctx, fsvc) {
- // Cached, return svc address
- logger.Debug("served from cache", zap.String("name", fsvc.Name), zap.String("address", fsvc.Address))
- executor.writeResponse(w, fsvc.Address, fn.ObjectMeta.Name)
- return
- }
- 。。。。。。
- }
-
- }
- 。。。
- // 核心逻辑: 服务可以调用多次,从缓存中读取function service对应的url,如果不存在该service,则特化pod,并新建对应的service
- serviceName, err := executor.getServiceForFunction(ctx, fn)
-
- 。。。
- executor.writeResponse(w, serviceName, fn.ObjectMeta.Name)
- }
代码入口:pkg/executor/executortype/poolmgr/gpm.go
- func (gpm *GenericPoolManager) GetFuncSvcFromPoolCache(ctx context.Context, fn *fv1.Function, requestsPerPod int) (*fscache.FuncSvc, int, error) {
- otelUtils.SpanTrackEvent(ctx, "GetFuncSvcFromPoolCache", otelUtils.GetAttributesForFunction(fn)...)
- return gpm.fsCache.GetFuncSvc(&fn.ObjectMeta, requestsPerPod)
- }
代码入口:pkg/executor/fscache/functionServiceCache.go
-
- // GetFuncSvc gets a function service from pool cache using function key and returns number of active instances of function pod
- func (fsc *FunctionServiceCache) GetFuncSvc(m *metav1.ObjectMeta, requestsPerPod int) (*FuncSvc, int, error) {
- key := crd.CacheKey(m)
-
- // 读取缓存
- fsvcI, active, err := fsc.connFunctionCache.GetValue(key, requestsPerPod)
- if err != nil {
- fsc.logger.Info("Not found in Cache")
- return nil, active, err
- }
-
- // update atime
- fsvc := fsvcI.(*FuncSvc)
- fsvc.Atime = time.Now()
-
- fsvcCopy := *fsvc
- return &fsvcCopy, active, nil
- }
代码入口:pkg/poolcache/poolcache.go
-
- // GetValue returns a value interface with status inActive else return error
- func (c *Cache) GetValue(function interface{}, requestsPerPod int) (interface{}, int, error) {
- respChannel := make(chan *response)
- c.requestChannel <- &request{
- requestType: getValue,
- function: function,
- requestsPerPod: requestsPerPod,
- responseChannel: respChannel,
- }
- resp := <-respChannel
- return resp.value, resp.totalActive, resp.error
- }
这里GetValue是通过管道发消息来处理的,之后通过respChannel管道获取结果。
从管道获取service
这部分是poolcache的内容,poolcache服务在execotor启动是启动的。在代码NewPoolCache中,详细的后续待进一步分析,这里仅分析和请求相关的内容。
代码入口:pkg/poolcache/poolcache.go
- func (c *Cache) service() {
- for {
- req := <-c.requestChannel
- resp := &response{}
- switch req.requestType {
- case getValue:
- values, ok := c.cache[req.function]
- found := false
- if !ok {
- resp.error = ferror.MakeError(ferror.ErrorNotFound,
- fmt.Sprintf("function Name '%v' not found", req.function))
- } else {
- for addr := range values {
- if values[addr].activeRequests < req.requestsPerPod && values[addr].currentCPUUsage.Cmp(values[addr].cpuLimit) < 1 {
- // mark active
- values[addr].activeRequests++
- resp.value = values[addr].val
- found = true
- break
- }
- }
- if !found {
- resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%v' all functions are busy", req.function))
- }
- resp.totalActive = len(values)
- }
- // 结果通过管道返回
- req.responseChannel <- resp
- case setValue:
- if _, ok := c.cache[req.function]; !ok {
- c.cache[req.function] = make(map[interface{}]*value)
- }
- if _, ok := c.cache[req.function][req.address]; !ok {
- c.cache[req.function][req.address] = &value{}
- }
- c.cache[req.function][req.address].val = req.value
- c.cache[req.function][req.address].activeRequests++
- c.cache[req.function][req.address].cpuLimit = req.cpuUsage
- case listAvailableValue:
- 。。。
- case setCPUUtilization:
- 。。。
- case markAvailable:
- 。。。
- case deleteValue:
- 。。。
- default:
- 。。。
- }
- }
- }
写入service cache
- // SetValue marks the value at key [function][address] as active(begin used)
- func (c *Cache) SetValue(function, address, value interface{}, cpuLimit resource.Quantity) {
- respChannel := make(chan *response)
- c.requestChannel <- &request{
- requestType: setValue,
- function: function,
- address: address,
- value: value,
- cpuUsage: cpuLimit,
- responseChannel: respChannel,
- }
- }
查看函数调用关系,可以知道写入入口是createServiceForFunction

代码入口:pkg/executor/executor.go
- func (executor *Executor) createServiceForFunction(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
- ......
- t := fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType
- e, ok := executor.executorTypes[t]
- if !ok {
- return nil, errors.Errorf("Unknown executor type '%v'", t)
- }
-
- fsvc, fsvcErr := e.GetFuncSvc(ctx, fn)
- ......
-
- return fsvc, fsvcErr
- }
代码入口:pkg/executor/executortype/poolmgr/gpm.go
-
- func (gpm *GenericPoolManager) GetFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
- otelUtils.SpanTrackEvent(ctx, "GetFuncSvc", otelUtils.GetAttributesForFunction(fn)...)
- logger := otelUtils.LoggerWithTraceID(ctx, gpm.logger)
- // from Func -> get Env
- logger.Debug("getting environment for function", zap.String("function", fn.ObjectMeta.Name))
- env, err := gpm.getFunctionEnv(ctx, fn)
- if err != nil {
- return nil, err
- }
-
- pool, created, err := gpm.getPool(ctx, env)
- if err != nil {
- return nil, err
- }
-
- if created {
- logger.Info("created pool for the environment", zap.String("env", env.ObjectMeta.Name), zap.String("namespace", gpm.namespace))
- }
-
- // from GenericPool -> get one function container
- // (this also adds to the cache)
- logger.Debug("getting function service from pool", zap.String("function", fn.ObjectMeta.Name))
- return pool.getFuncSvc(ctx, fn)
- }
代码入口:pkg/executor/executortype/poolmgr/gp.go
- func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
- // 配置pod函数相关的label
- logger.Info("choosing pod from pool")
- funcLabels := gp.labelsForFunction(&fn.ObjectMeta)
- ...
- // 选择一个pod
- key, pod, err := gp.choosePod(ctx, funcLabels)
- if err != nil {
- return nil, err
- }
- gp.readyPodQueue.Done(key)
-
- // pod进行特化处理
- err = gp.specializePod(ctx, pod, fn)
- if err != nil {
- gp.scheduleDeletePod(pod.ObjectMeta.Name)
- return nil, err
- }
- logger.Info("specialized pod", zap.String("pod", pod.ObjectMeta.Name), zap.String("podNamespace", pod.ObjectMeta.Namespace), zap.String("podIP", pod.Status.PodIP))
-
- var svcHost string
- if gp.useSvc && !gp.useIstio {
- svcName := fmt.Sprintf("svc-%v", fn.ObjectMeta.Name)
- if len(fn.ObjectMeta.UID) > 0 {
- svcName = fmt.Sprintf("%s-%v", svcName, fn.ObjectMeta.UID)
- }
-
- svc, err := gp.createSvc(ctx, svcName, funcLabels)
- if err != nil {
- gp.scheduleDeletePod(pod.ObjectMeta.Name)
- return nil, err
- }
- if svc.ObjectMeta.Name != svcName {
- gp.scheduleDeletePod(pod.ObjectMeta.Name)
- return nil, errors.Errorf("sanity check failed for svc %v", svc.ObjectMeta.Name)
- }
-
- // the fission router isn't in the same namespace, so return a
- // namespace-qualified hostname
- svcHost = fmt.Sprintf("%v.%v:8888", svcName, gp.namespace)
- } else if gp.useIstio {
- svc := utils.GetFunctionIstioServiceName(fn.ObjectMeta.Name, fn.ObjectMeta.Namespace)
- svcHost = fmt.Sprintf("%v.%v:8888", svc, gp.namespace)
- } else {
- svcHost = fmt.Sprintf("%v:8888", pod.Status.PodIP)
- }
-
- otelUtils.SpanTrackEvent(ctx, "addFunctionLabel", otelUtils.GetAttributesForPod(pod)...)
- // patch svc-host and resource version to the pod annotations for new executor to adopt the pod
- patch := fmt.Sprintf(`{"metadata":{"annotations":{"%v":"%v","%v":"%v"}}}`,
- fv1.ANNOTATION_SVC_HOST, svcHost, fv1.FUNCTION_RESOURCE_VERSION, fn.ObjectMeta.ResourceVersion)
- p, err := gp.kubernetesClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, k8sTypes.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
- if err != nil {
- // just log the error since it won't affect the function serving
- logger.Warn("error patching svc-host to pod", zap.Error(err),
- zap.String("pod", pod.Name), zap.String("ns", pod.Namespace))
- } else {
- pod = p
- }
-
- kubeObjRefs := []apiv1.ObjectReference{
- {
- Kind: "pod",
- Name: pod.ObjectMeta.Name,
- APIVersion: pod.TypeMeta.APIVersion,
- Namespace: pod.ObjectMeta.Namespace,
- ResourceVersion: pod.ObjectMeta.ResourceVersion,
- UID: pod.ObjectMeta.UID,
- },
- }
- cpuUsage := resource.MustParse("0m")
- for _, container := range pod.Spec.Containers {
- val := *container.Resources.Limits.Cpu()
- cpuUsage.Add(val)
- }
-
- // set cpuLimit to 85th percentage of the cpuUsage
- cpuLimit, err := gp.getPercent(cpuUsage, 0.85)
- if err != nil {
- logger.Error("failed to get 85 of CPU usage", zap.Error(err))
- cpuLimit = cpuUsage
- }
- logger.Debug("cpuLimit set to", zap.Any("cpulimit", cpuLimit))
-
- m := fn.ObjectMeta // only cache necessary part
- fsvc := &fscache.FuncSvc{
- Name: pod.ObjectMeta.Name,
- Function: &m,
- Environment: gp.env,
- Address: svcHost,
- KubernetesObjects: kubeObjRefs,
- Executor: fv1.ExecutorTypePoolmgr,
- CPULimit: cpuLimit,
- Ctime: time.Now(),
- Atime: time.Now(),
- }
-
- gp.fsCache.PodToFsvc.Store(pod.GetObjectMeta().GetName(), fsvc)
- gp.podFSVCMap.Store(pod.ObjectMeta.Name, []interface{}{crd.CacheKey(fsvc.Function), fsvc.Address})
- gp.fsCache.AddFunc(*fsvc)
-
- gp.fsCache.IncreaseColdStarts(fn.ObjectMeta.Name, string(fn.ObjectMeta.UID))
-
- logger.Info("added function service",
- zap.String("pod", pod.ObjectMeta.Name),
- zap.String("podNamespace", pod.ObjectMeta.Namespace),
- zap.String("serviceHost", svcHost),
- zap.String("podIP", pod.Status.PodIP))
-
- otelUtils.SpanTrackEvent(ctx, "getFuncSvcComplete", otelUtils.GetAttributesForFuncSvc(fsvc)...)
- return fsvc, nil
getServiceForFunction
// 核心逻辑: 服务可以调用多次,从缓存中读取function service对应的url,如果不存在该service,则特化pod,并新建对应的service
- // 代码入口: pkg/executor/api.go
- func (executor *Executor) getServiceForFunction(ctx context.Context, fn *fv1.Function) (string, error) {
- respChan := make(chan *createFuncServiceResponse)
- // 通过管道发送创建function service服务请求消息
- executor.requestChan <- &createFuncServiceRequest{
- context: ctx,
- function: fn,
- respChan: respChan,
- }
- resp := <-respChan
- if resp.err != nil {
- return "", resp.err
- }
- return resp.funcSvc.Address, resp.err
- }
-
- // 代码入口:pkg/executor/executor.go
- func (executor *Executor) serveCreateFuncServices() {
- // 接收上面的管道消息
- for {
- req := <-executor.requestChan
- 。。。
- // 核心处理逻辑
- fsvc, err := executor.createServiceForFunction(fnSpecializationTimeoutContext, req.function)
- 。。。
- // 返回response消息
- req.respChan <- &createFuncServiceResponse{
- funcSvc: fsvc,
- err: err,
- }
-
- }
- }
createServiceForFunction -> GenericPoolManager.GetFuncSvc -> GenericPoolManager.GetFuncSvc
代码入口:pkg/executor/executortype/poolmgr/gp.go
- func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
- 。。。
- // 核心逻辑:选择一个pod,修改pod label
- key, pod, err := gp.choosePod(ctx, funcLabels)
- // 向pod发送特化请求
- err = gp.specializePod(ctx, pod, fn)
-
- }
specializePod
向fetcher容器发送请求 POST /specialize,fetcher回去加载代码,具体逻辑后续会继续分析fetcher模块。
- // specializePod chooses a pod, copies the required user-defined function to that pod
- // (via fetcher), and calls the function-run container to load it, resulting in a
- // specialized pod.
- func (gp *GenericPool) specializePod(ctx context.Context, pod *apiv1.Pod, fn *fv1.Function) error {
-
- // tell fetcher to get the function.
- fetcherURL := gp.getFetcherURL(podIP)
- logger.Info("calling fetcher to copy function", zap.String("function", fn.ObjectMeta.Name), zap.String("url", fetcherURL))
-
- specializeReq := gp.fetcherConfig.NewSpecializeRequest(fn, gp.env)
-
- // Fetcher will download user function to share volume of pod, and
- // invoke environment specialize api for pod specialization.
- err := fetcherClient.MakeClient(gp.logger, fetcherURL).Specialize(ctx, &specializeReq)
- if err != nil {
- return err
- }
- otelUtils.SpanTrackEvent(ctx, "specializedPod", otelUtils.GetAttributesForPod(pod)...)
- return nil
- }
kubectl port-forward
获取pod列表调用关系
-
- func runPortForward(ctx context.Context, labelSelector string, localPort string, ns string, kubeContext string) (chan struct{}, chan struct{}, error) {
- 。。。。。。
- // create request URL
- req := clientset.CoreV1().RESTClient().Post().Resource("pods").
- Namespace(podNameSpace).Name(podName).SubResource("portforward")
- url := req.URL()
- 。。。。。。
-
- }



源码解析 kubectl port-forward 工作原理
