• 【Kubernetes】Operator开发之kubebuilder实战(一)


    1 什么是Operator

    k8s原生提供了很多资源类型,像用于无状态应用的Deployment、ReplicaSet和有状态应用的StatefulSet,用户可以通过向k8s提交yaml文件的方式进行资源的管理,但是,有时候,原生的这些资源无法满足我们的需求,例如,当用户希望在k8s中部署prometheus进行业务监控。

    在prometheus的监控体系中,除了二进制程序,重要的就是几个配置文件:prometheus自身要抓取数据的目标、获取到数据后的检测规则等。因此,一种实现方式是将prometheus部署为Pod,并将它们的配置文件放到ConfigMap,prometheus从ConfigMap挂载,当用户修改ConfigMap时,在prometheus的Pod中加一个sidecar容器,该容器检测ConfigMap挂载的文件的变化,如果该文件变化,则调用prometheus接口让prometheus热加载配置,以此来完成配置的变更和加载操作。这种方式的问题是,对于不熟悉prometheus的用户还需要花时间去学习如何配置prometheus,而且,对于业务数据采集的场景下,prometheus的部署和维护可能是运维人员,用户通常只希望提供业务数据采集接口进行接入即可,最重要的是,这种方式不够k8s

    k8s提供了一种特殊的CustomResourceDefinition资源类型,用户可以通过这种资源类型向k8s增加新的资源类型,如上例,可以增加PrometheusScrapeTarget、PrometheusRule等资源类型,用户可以用k8s的方式创建这些资源。但是,创建这些资源只是让k8s知道有这个新的资源类型,没有任何组件去驱动这些资源生效,例如,当用户创建PrometehsuScrapeTarget时,在该资源中声明需要采集的Pod的selector,那么,就需要将这些Pod的IP和采集接口放到prometheus的配置文件中并加载,这就需要靠Controller。

    因此,用更加k8s的方式实现上述功能,在需要告诉k8s新增资源类型的同时,还需要额外开发Controller,让新增的资源能够生效,这就是Operator,可以简单理解为Operator = CRD + Controller

    2 环境准备

    2.1 安装golang

    golang下载对应平台的golang压缩包,然后解压到/usr/local目录下:

    tar -xf go1.21.3.linux-amd64.tar.gz -C /usr/local/ 
    
    • 1

    然后在/etc/profile配置环境变量:

    export GOROOT=/usr/local/go
    export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
    export GOPROXY=https://goproxy.cn
    
    • 1
    • 2
    • 3

    然后执行. /etc/profile,再执行go version可以查看golang版本:

    root@ubuntu:~# go version
    go version go1.21.3 linux/amd64
    
    • 1
    • 2

    /root/go_projects目录下编写一个简单的helloworld程序测试:

    // helloworld.go
    package main
    
    import "fmt"
    
    func main()  {
    	fmt.Println("Hello, World!")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    直接go run helloworld.go可以查看输出结果。

    2.2 安装kubebuilder
    curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)"
    chmod +x kubebuilder && mv kubebuilder /usr/local/bin/
    
    • 1
    • 2

    然后执行kubebuilder version查看版本:

    root@ubuntu:~/kubebuilder# kubebuilder version
    Version: main.version{KubeBuilderVersion:"3.12.0", KubernetesVendor:"1.27.1", GitCommit:"b48f95cd5384eadcdfd02a47a02910f72ddc7ea8", BuildDate:"2023-09-06T06:04:11Z", GoOs:"linux", GoArch:"amd64"}
    
    • 1
    • 2

    配置命令自动补全:

    # 安装bash-completion
    apt install bash-completion
    
    echo "source <(kubebuilder completion bash)" >> /etc/profile
    
    • 1
    • 2
    • 3
    • 4

    然后在使用kubebuilder过程中就可以通过tab键进行自动补全。

    3 Operator Demo

    通常一个资源的字段可能会比较多,实现起来会比较复杂,这里为了简化整个开发流程,会使用简单的例子说明。这里开发的是一个类似ReplicaSet的资源,它会负责创建一定数量的Pod,并监控Pod的状态。

    3.1 初始化项目
    kubebuilder init --domain tutorial.kubebuilder.io --repo github.com/demo
    
    • 1
    • GVK:Group(api组,例如apps、batch)、Version(api组的版本,例如v1、v1beta1)、Kind(例如Deployment、CronJob)
    • GVR:Group(api组,例如apps、batch)、Version(api组的版本,例如v1、v1beta1)、Resource(例如deployments、cronjobs),大部分场景下,Kind和Resource是1对1的关系
    3.2 创建API
    kubebuilder create api --group batch --version v1 --kind Demo
    
    • 1

    执行上面的命令后会自动创建以下目录和文件:

    • api/v1/
      * demo_types.go Demo的类型定义
      * groupversion_info.go group和version的定义
      * zz_generated.deepcopy.go 深拷贝实现,是runtime.Object接口的自动实现
    • bin/
      * controller-gen
    • config/crd/
      * kustomization.yaml
      * kustomizeconfig.yaml
      * patches/cainjection_in_demos.yaml
      * patches/webhook_in_demos.yaml
    • config/samples/
      * batch_v1_demo.yaml 测试用例
      * kustomization.yaml
    • internal/controller/
      * demo_controller.go 控制器的调谐逻辑
      * suite_test.go 单元测试

    1 api/v1/demo_types.go

    该文件包含CRD的定义,Demo的结构体跟常规的yaml配置文件对应,一种资源包含4个字段:

    • metav1.TypeMeta 指定GVK(apiVersion和kind)
    • metav1.ObjectMeta metadata部分,包含一些公共属性
    • DemoSpec 用户期望的状态
    • DemoStatus 资源当前实际的状态

    2 api/v1/groupversion_info.go

    Group和Version的定义以及注册。

    3 api/v1/zz_generated.deepcopy.go

    runtime.Object接口的实现,主要是DeepCopyObject()的实现。

    type Object interface {
        GetObjectKind() schema.ObjectKind
        DeepCopyObject() Object
    }
    
    • 1
    • 2
    • 3
    • 4

    2 internal/controller/demo_controller.go

    controller的实现,里面主要的内容是Reconcile()方法:

    func (r *DemoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
        _ = log.FromContext(ctx)
    
        // your logic here
    
        return ctrl.Result{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    创建完成api后,就需要填充Demo的字段以及控制器的实现。

    3.3 填充CRD字段

    这里实现Demo的简单版本,demo.spec中只有pod的模板和副本数:

    type DemoSpec struct {
           // pod模板 
    	Template corev1.PodTemplateSpec `json:"template"`
    
           // 副本数
    	Replicas *int64 `json:"replicas"`
    
           // pod选择器
    	Selector map[string]string `json:"selector"`
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    而demo.status中只记录当前副本的数量:

    type DemoStatus struct {
    	CurrentReplicas *int64 `json:"currentReplicas"`
    }
    
    • 1
    • 2
    • 3

    其他的Demo和DemoList不做修改。接下来就需要实现控制器。

    3.4 实现控制器

    根据Demo的字段定义以及对ReplicaSet的理解,可以设想Demo Controller的逻辑如下:

    • 根据selector在Demo所在命名空间查找Pod,假设查找到N个Pod
    • 如果N < replicas,则依据template模板创建replicas-N个Pod
    • 如果N > replicas,则按照时间顺序删除旧的Pod
    • 将N更新到demo.status.currentReplicas

    首先看下kubebuilder给我们生成的脚手架代码。

    跟k8s中常规代码一样,控制器的入口在cmd/main.go

    func main() {
        // 获取命令行参数
    	var metricsAddr string
    	var enableLeaderElection bool
    	var probeAddr string
    	flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
    	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
    	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
    		"Enable leader election for controller manager. "+
    			"Enabling this will ensure there is only one active controller manager.")
    	opts := zap.Options{
    		Development: true,
    	}
    	opts.BindFlags(flag.CommandLine)
    	flag.Parse()
    
    	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
    
        // 创建控制器管理器,同时传入上面的命令行参数
    	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    		Scheme:                 scheme,
    		Metrics:                metricsserver.Options{BindAddress: metricsAddr},
    		HealthProbeBindAddress: probeAddr,
    		LeaderElection:         enableLeaderElection, // 是否启动选主逻辑,当运行多个控制器时需要开启
    		LeaderElectionID:       "3d4b5da1.tutorial.kubebuilder.io",
    	})
    	if err != nil {
    		setupLog.Error(err, "unable to start manager")
    		os.Exit(1)
    	}
    
        // 将Demo的控制器加入到控制器管理器
    	if err = (&controller.DemoReconciler{
    		Client: mgr.GetClient(),
    		Scheme: mgr.GetScheme(),
    	}).SetupWithManager(mgr); err != nil {
    		setupLog.Error(err, "unable to create controller", "controller", "Demo")
    		os.Exit(1)
    	}
    	//+kubebuilder:scaffold:builder
    
        // 下面两个接口用于Pod的探针配置
        // 控制器增加healthz接口
    	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
    		setupLog.Error(err, "unable to set up health check")
    		os.Exit(1)
    	}
    
        // 控制器增加ready接口
    	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
    		setupLog.Error(err, "unable to set up ready check")
    		os.Exit(1)
    	}
    
        // 注册SIGTERM和SIGINT信号,当收到两次信号时程序就会退出
    	setupLog.Info("starting manager")
    	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    		setupLog.Error(err, "problem running manager")
    		os.Exit(1)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    这里与我们的控制器相关的代码就是(&controller.DemoReconciler{}).SetupWithManager(mgr)

    // SetupWithManager sets up the controller with the Manager.
    func (r *DemoReconciler) SetupWithManager(mgr ctrl.Manager) error {
    	return ctrl.NewControllerManagedBy(mgr).
    		For(&batchv1.Demo{}).
    		Complete(r)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在Complete()中会调用Build(),这里就负责创建Controller和执行ListAndWatch操作:

    func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
            if r == nil {
                    return nil, fmt.Errorf("must provide a non-nil Reconciler")
            }
            if blder.mgr == nil {
                    return nil, fmt.Errorf("must provide a non-nil Manager")
            }
            if blder.forInput.err != nil {
                    return nil, blder.forInput.err
            }
    
            // 创建Controller
            if err := blder.doController(r); err != nil {
                    return nil, err
            }
    
            // 执行watch
            if err := blder.doWatch(); err != nil {
                    return nil, err
            }
    
            return blder.ctrl, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    下面就可以实现我们的控制器:

    // req是变化的资源的名称,因此,在调谐逻辑开始时通常会通过名称获取对应的资源
    // 返回值除了常规的error,还有一个Result,Result中包含两个元素:
    // Requeue bool:表明是否重新入队列处理
    // RequeueAfter time.Duration:多久之后重新入队列,如果设置了RequeueAfter则不需要设置Requeue
    func (r *DemoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    	_ = log.FromContext(ctx)
    
    	// 1 获取demo资源对象
    	var demo batchv1.Demo
    	if err := r.Get(ctx, req.NamespacedName, &demo); err != nil {
    		log.Log.Error(err, "unable to fetch demo", "demo_name", req.Name)
    		// 忽略掉 not-found 错误,通常出现在资源已经被删除时
    		return ctrl.Result{}, client.IgnoreNotFound(err)
    	} else {
    		log.Log.Info("fetch demo success", "demo_name", req.Name)
    	}
    
    	// 2 根据资源对象获取Pod数量
    	var podList corev1.PodList
    	if err := r.List(ctx, &podList, client.InNamespace(req.Namespace), &client.ListOptions{
    		LabelSelector: labels.SelectorFromSet(demo.Spec.Selector),
    	}); err != nil {
    		log.Log.Error(err, "unable to list child Pods", "demo_name", req.Name)
    		return ctrl.Result{}, err
    	} else {
    		log.Log.Info("list child Pods success", "demo_name", req.Name, "pod_cnt", len(podList.Items))
    	}
    
    	// 3 对比当前Pod数量和期望数量
    	var podCnt int64
    	podCnt = int64(len(podList.Items))
    	if podCnt < *demo.Spec.Replicas {
    		// 当前Pod数量少于期望数量,则创建Pod
    		name := fmt.Sprintf("%s-%d", req.Name, time.Now().UnixMicro())
    
    		pod := &corev1.Pod{
    			ObjectMeta: metav1.ObjectMeta{
    				Labels:      make(map[string]string),
    				Annotations: make(map[string]string),
    				Name:        name,
    				Namespace:   req.Namespace,
    			},
    			Spec: *demo.Spec.Template.Spec.DeepCopy(),
    		}
    
                 // 这里本来想用demo.Spec.template.ObjectMeta.Labels,发现是空的
    		for k, v := range demo.Spec.Selector {
    			//pod.Labels[k] = v
    			pod.ObjectMeta.Labels[k] = v
    		}
    
    		if err := r.Create(ctx, pod); err != nil {
    			log.Log.Error(err, "unable to create Pod", "pod_name", name)
    			return ctrl.Result{}, err
    		} else {
    			log.Log.Info("create Pod success", "pod_name", name)
    			podCnt += 1
    		}
    	} else {
    		// 当前Pod数量大于期望数量,则需要删除多余的Pod
    		// 由于可能有正在删除的Pod,因此,在进行数量判断时,需要对正在删除的Pod进行筛选
    		podCnt = 0
    		var pod_tmp corev1.Pod
    		for _, pod_tmp = range podList.Items {
    			annotations := pod_tmp.GetAnnotations()
    			if annotations != nil {
    				flag := false
    				for k, v := range annotations {
    					if k == "demo-deleting" && v == req.Name {
    						flag = true
    						break
    					}
    				}
    				if flag == true {
    					continue
    				}
    			}
    
    			if pod_tmp.Status.Phase == corev1.PodRunning {
    				podCnt += 1
    			}
    		}
    
    		if podCnt > *demo.Spec.Replicas {
    			// 在删除Pod之前,给Pod加一个Annotations,然后再删除Pod,后续就可以用该Annotations判断Pod是否正在删除中
    			annotations := pod_tmp.GetAnnotations()
    			if annotations == nil {
    				annotations = make(map[string]string)
    			}
    			annotations["demo-deleting"] = req.Name
    			pod_tmp.SetAnnotations(annotations)
    
    			if err := r.Update(ctx, &pod_tmp, &client.UpdateOptions{}); err != nil {
    				log.Log.Error(err, "update Pod annotations failed", "pod_name", pod_tmp.Name)
    				return ctrl.Result{RequeueAfter: time.Second}, nil
    			} else {
    				log.Log.Info("update Pod annotations success", "pod_name", pod_tmp.Name)
    			}
    
    			if err := r.Delete(ctx, &pod_tmp); err != nil {
    				log.Log.Error(err, "delete Pod failed", "pod_name", pod_tmp.Name)
    			} else {
    				log.Log.Info("delete Pod success", "pod_name", pod_tmp.Name)
    				podCnt -= 1
    			}
    
    			return ctrl.Result{RequeueAfter: time.Second}, nil
    		}
    	}
    
    	// 4 更新status状态
    	demo.Status.CurrentReplicas = &podCnt
    	if err := r.Status().Update(ctx, &demo); err != nil {
    		log.Log.Error(err, "unable to update Demo status")
    		return ctrl.Result{}, err
    	} else {
    		log.Log.Info("update Demo status success", "demo_name", demo.Name)
    	}
    
    	return ctrl.Result{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    3.5 部署测试

    代码开发完成后,就需要进行部署测试,其实正常情况下,在未填充任何代码时也应该先部署测试,保证kubebuilder生成的代码可以在当前版本的k8s中使用。

    首先看下生成的Makefile提供了哪些命令,执行make help可以看到支持的子命令:

    开发:

    • manifests 生成Webhook、ClusterRole和CRD对象,例如config/crd/bases下的CRD文件和config/crd/rbac/下面的文件
    • generate 重新生成zz_generated.deepcopy.go文件
    • fmt 运行go fmt格式化代码
    • vet 运行go vet检查语法错误
    • test 运行项目中的测试代码,主要是控制器目录的test文件

    构建:

    • build 构建控制器二进制
    • run 在本地启动控制器
    • docker-build 构建控制器镜像
    • docker-push 推送控制器镜像
    • docker-buildx 构建跨平台的镜像

    部署:

    • install 在集群中安装CRD
    • uninstall 在集群中卸载CRD
    • deploy 在集群中部署controller
    • undeploy 在集群中卸载controller

    下面就对我们开发的程序部署,跟前面说的一样,Operator主要就是CRD和Controller,部署也是一样:

    • 生成CRD和RBAC:make manifests
    • 为Demo资源重新生成zz_generated.deepcopy.go:make generate
    • 在k8s中创建CRD:make install
    • 本地运行Controller:make run

    与常规的Controller不同,这种方式就会在本地启动Controller,该控制器通过~/.kube/config文件去调用k8s api的接口进行交互。

    此时,可以在k8s中看到Demo资源:

    请添加图片描述

    然后可以修改config/samples/batch_v1_demo.yaml文件:

    apiVersion: batch.tutorial.kubebuilder.io/v1
    kind: Demo
    metadata:
      labels:
        app.kubernetes.io/name: demo
        app.kubernetes.io/instance: demo-sample
        app.kubernetes.io/part-of: demo
        app.kubernetes.io/managed-by: kustomize
        app.kubernetes.io/created-by: demo
      name: demo-sample
    spec:
      replicas: 2
      selector:
        app: demo-kubebuilder
      template:
        metadata:
          labels:
            app: demo-kubebuilder
        spec:
          containers:
          - name: nginx
            image: nginx
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    接下来可以将该yaml提交到k8s创建Demo对象,然后就可以查看Controller的日志:

    请添加图片描述

    从这里面可以看出make run其实就是make manifests + make generate + make fmt + make vet + go run ./cmd/main.go,因此,如果是在本地运行时如果已经将资源对象提交到k8s,后续又没有修改资源对象的字段,只是调整了Controller的代码,可以直接执行make run

    执行kubectl edit demo demo-sample修改replicas字段会发现Pod的数量也会随之变化。

    3.6 两个问题

    对上面的代码测试会发现两个问题:

    • 修改Demo的replicas字段发现Pod的数量会与期望保持一致,但是如果删除Pod,并没有新的Pod重建,这是因为当前Controller只监听了Demo的变化,没有监听Pod的变化
    • Pod的删除是通过先设置Annotations,这种方式不够优雅

    4 总结

    本文用实现类似ReplicaSet的Demo资源讲解了Operator的开发过程,虽然是很简单的功能,还是需要考虑很多问题,实际的Operator开发更加复杂,考虑的问题更多,虽然k8s提供了Operator这种机制用于扩展资源,但是开发的门槛还是有些高,幸运的是,OperatorHub提供了大量可用的Operator,在开发Operator之前可以先在这里查找下是否已经有现成的Operator可以使用。

  • 相关阅读:
    GBase 8c核心特性-分布式事务
    centos 文件分割
    windows下配置maven
    GeoDa与R语言的空间数据回归
    Oracle数据库经纬度坐标查询优化与结果错误原因分析、SQL中WKT超长文本字符串处理
    mbedtls 自带SSL demo调试
    常用的辅助类(必会)
    虹科Pico汽车示波器 | 免拆诊断案例 | 2006 款林肯领航员车发动机怠速抖动
    【Linux】部署单机OA项目及搭建spa前后端分离项目
    SpringBoot | Spring Boot“整合Redis“
  • 原文地址:https://blog.csdn.net/ILOVEYOUXIAOWANGZI/article/details/133956284