K8S中Operator的开发实现

六月 03, 2025 / 1900 / 8阅读 / 0评论/ 分类: 软件技术

前言

最近在看k8s相关的资料,觉得实现一个k8s的Operator将Diva项目更方便的部署到k8s里面去,Diva是一个基于ansible的传统的自动运维管理系统,对接了公司的版本发布系统。
本文并不是合格的教程,只是记录自己一些实现的过程,供自己回顾。

开始

Operator 是一种封装、部署和管理 kubernetes 应用的方法。
简单来说,Operator 是由 kubernetes 自定义资源(CRD, Custom Resource Definition)和控制器(Controller)构成的云原生扩展服务。Operator 把部署应用的过程或业务运行过程中需要频繁操作的命令封装起来,运维人员只需要关注应用的配置和应用的期望运行状态即可,无需花费大量的精力在部署和容易出错的命令上面。
这里是通过java-operator-sdk实现的功能
首先看一下diva的部署图:
OsUkmB.md.png
根据部署图,定义了Diva自定义资源
Diva的K8S资源共有三个镜像,一个Deployment,一个Dtatefulset

  • diva本体服务的镜像,作为运维管理服务
  • diva的客户端的镜像,作为应用载体
  • diva的额外资源的镜像,作为本体服务的初始化容器,运输d运维管理的额外资源
  • ssh公私钥,通过ssh协议连接各个client
  • 存储资源,这里diva和client 都需要存储挂载

这里是手搓了脚本,也可以通过java类的注解生成:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  # 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>'
  name: divas.ops.xx.com
spec:   
   # 组名称,用于 REST API: /apis/<组>/<版本>
  group: ops.xx.com
  names:
    # 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式>
    plural: divas
    # 名称的单数形式,作为命令行使用时和显示时的别名
    singular: diva
    # kind 通常是单数形式的驼峰命名(CamelCased)形式。你的资源清单会使用这一形式。
    kind: Diva
    # shortNames 允许你在命令行使用较短的字符串来匹配资源
    shortNames:
      - dv
  # 可以是 Namespaced 或 Cluster   
  scope: Namespaced 
  versions:
    - name: v1
      additionalPrinterColumns:
        - name: "manager"
          type: string
          jsonPath: .status.manager
        - name: "clients"
          type: string
          jsonPath: .status.clients
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp
        - name: message
          type: string
          jsonPath: .status.message
      subresources:
        status: {}
      # 每个版本都可以通过服务标志启用/禁用。
      served: true
      # 必须将一个且只有一个版本标记为存储版本。
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            status:
              type: object
              properties:
                manager:
                  type: string
                clients:
                  type: string
                message:
                  type: string
            spec:
              type: object
              properties:
                client:
                  type: object
                  required: [replicas, image, storage]
                  properties:
                    replicas:
                      type: integer
                      minimum: 1
                    image:
                      type: string
                      pattern: "^.+:.+$"  # 强制镜像标签格式
                    storage:
                      type: object
                      required: [size, className]
                      properties:
                        size: 
                          type: string
                          pattern: "^[1-9]\\d*Gi$"
                        className: 
                          type: string
                manager:
                  type: object
                  required: [image, storage]
                  properties:
                    replicas:
                      type: integer
                      minimum: 1
                    image:
                      type: string
                      pattern: "^.+:.+$"  # 强制镜像标签格式
                    storage:
                      type: object
                      required: [size, className]
                      properties:
                        size: 
                          type: string
                          pattern: "^[1-9]\\d*Gi$"
                        className: 
                          type: string
                extra:
                  type: object
                  required: [image, storage]
                  properties:
                    replicas:
                      type: integer
                      minimum: 1
                    image:
                      type: string
                      pattern: "^.+:.+$"  # 强制镜像标签格式
                    storage:
                      type: object
                      required: [size, className]
                      properties:
                        size: 
                          type: string
                          pattern: "^[1-9]\\d*Gi$"
                        className: 
                          type: string
                privateKey:
                  type: string
                publicKey:
                  type: string                            

最后形成的diva部署清单如下

apiVersion: ops.xx.com/v1
kind: Diva
metadata:
  name: diva-test
  namespace: my-diva
spec:
  client:
    replicas: 2
    image: diva-client:v0.0.1
    storage:
      size: 2Gi
      className: longhorn
  manager:
    image: diva:v1.1
    storage:
      size: 2Gi
      className: longhorn
  extra:
    image: diva-extra:0.0.1
    storage:
      size: 2Gi
      className: longhorn
  privateKey: 
  publicKey: ssh-rsa AAAAB3NzaC1yc2EAAAA...

工程初始化

本项目使用spring-boot-starter配置,预留了webflux模块,方便后期通过http提供相关接口。

   <properties>
        <java.version>17</java.version>
        <operator-framework-spring-boot.version>6.0.1</operator-framework-spring-boot.version>
        <fabric8-client.version>7.1.0</fabric8-client.version>
        <bouncycastle.version>1.80</bouncycastle.version>
    </properties>
    <dependencies>

        <dependency>
                <groupId>io.javaoperatorsdk</groupId>
            <artifactId>operator-framework-spring-boot-starter</artifactId>
            <version>${operator-framework-spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>io.javaoperatorsdk</groupId>
            <artifactId>operator-framework-spring-boot-starter-test</artifactId>
            <version>${operator-framework-spring-boot.version}</version>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j2-impl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.fabric8</groupId>
            <artifactId>crd-generator-apt</artifactId>
            <version>${fabric8-client.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk18on</artifactId>
            <version>${bouncycastle.version}</version>
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk18on</artifactId>
            <version>${bouncycastle.version}</version>
        </dependency>
        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

主要配置类,这里实例化k8s客户端,在开发时可用证书私钥等连接k8s集群,我这直接用的admin的连接,在集群内部无需连接配置,但是需要给定pod的sa绑定适当的权限。否则会有无权限的报错而无法启动。

@Configuration
public class CommonConfig {
    private String caCertData = "LS0tLS1...";
    private String clientCertData = "LS0tLS1...";
    private String clientKeyData = "LS0tL...";
	   private String host = "https://192.11.111.91:6443";
    @Bean
    public KubernetesClientBuilder kubernetesClientBuilder() {
        return new KubernetesClientBuilder();
    }

    @Bean
    public KubernetesClient kubernetesClient() {
        return kubernetesClientBuilder().build();
    }
    //@Bean()
    public KubernetesClient kubernetesClientWithConfig() {

        Config config = new ConfigBuilder().withMasterUrl(host)
                .withCaCertData(caCertData)
                .withClientCertData(clientCertData)
                .withClientKeyData(clientKeyData).build();
        //集群外创建client 需要指定 host 证书 私钥等
        return kubernetesClientBuilder().withConfig(config).build();
    }
}

定义类

@Group("ops.xx.com")
@Version("v1")
@ShortNames("dv")
public class Diva extends CustomResource<Diva.Spec,Diva.Status> implements Namespaced {
    public static class Spec {
        private DivaImage client;
        private DivaImage manager;
        private DivaImage extra;
        private String privateKey;
        private String publicKey;
		     // Getter & Setter ...
    }
    public static class DivaImage {
        private int replicas;
        private String image;
        private Storage storage;
					// Getter & Setter ...
    }
    public static class Storage {
        private String size;
        private String className;
					// Getter & Setter ...
    }
    public static class Status {
        private String manager;
        private String clients;
        private String message;
					// Getter & Setter ...
    }
}

核心代码

@Component
public class DivaReconciler implements Reconciler<Diva> {

    private static final Logger log = LoggerFactory.getLogger(DivaReconciler.class); 
	
    private final KubernetesClient client;
	
    public DivaReconciler(KubernetesClient kubernetesClient) {
        this.client = kubernetesClient;);
    }
	
	@Override
    public UpdateControl<Diva> reconcile(Diva diva, Context<Diva> context) throws Exception {
        String namespace = diva.getMetadata().getNamespace();
        // 创建或更新 ssh
        reconcileSecret(diva);
        // 创建或更新 SVC
        reconcileService(diva);
        // 创建或更新 PVC
        PersistentVolumeClaim clientPvc = reconcilePVC(namespace,CLIENT_PVC_NAME(diva),diva.getSpec().getClient().getStorage());
        PersistentVolumeClaim managerPvc = reconcilePVC(namespace,MANAGER_PVC_NAME(diva),diva.getSpec().getClient().getStorage());

        // 创建或更新 clients
        StatefulSet sts = buildClients(diva,clientPvc);

        // 创建或更新 manager
        Deployment deploy = buildManager(diva,managerPvc);
		// 更新状态
        updateStatus(diva);
        return UpdateControl.patchStatus(diva);
	}
}

问题1、关于创建更新k8s资源的代码

网上一些教程都使用了一些过时写法去判断资源是否需要更新或当资源不存在时需要创建,可采用serverSideApply() 方法,写法如下:

    private PersistentVolumeClaim reconcilePVC(String ns,String name, Diva.Storage storage) {

        PersistentVolumeClaim pvc  = new PersistentVolumeClaimBuilder()
                    .withNewMetadata()
                    .withName(name)
                    .withNamespace(ns)
                    .endMetadata()
                    .withNewSpec()
                    .withAccessModes("ReadWriteOnce")
                    .withStorageClassName(storage.getClassName())
                    .withNewResources()
                    .withRequests(Map.of("storage", new Quantity(storage.getSize())))
                    .endResources()
                    .endSpec()
                    .build();
            //return client.persistentVolumeClaims().inNamespace(ns).resource(pvc).create();

        return pvc;
    }

    private StatefulSet buildClients(Diva diva,PersistentVolumeClaim pvc ) {
        Diva.DivaImage divaClient = diva.getSpec().getClient();
        String name = diva.getMetadata().getName() ;
        String namespace = diva.getMetadata().getNamespace() ;
        String stsName = CLIENT_STS_NAME(diva);
        StatefulSet desiredSts = new StatefulSetBuilder()
                .withNewMetadata()
                .withName(stsName)
                .withNamespace(namespace)
                .addToLabels("app", "diva-client")
                .addToLabels("diva-client", name)
                .endMetadata()
                .withNewSpec()
                .withReplicas(divaClient.getReplicas())
                .withSelector(new LabelSelectorBuilder().addToMatchLabels("diva-client", name).build())
                .withServiceName(CLIENT_SVC_NAME(diva))  // Headless Service 名称
                .withTemplate(new PodTemplateSpecBuilder()
                        .withNewMetadata()
                        .addToLabels("diva-client", name)
                        .endMetadata()
                        .withNewSpec()
                        .withVolumes(
                                new VolumeBuilder()
                                        .withName("diva-client-ssh")
                                        .withSecret(new SecretVolumeSourceBuilder()
                                                .withSecretName(SSH_SECRET_NAME(diva))
                                                .withDefaultMode(384)
                                                .withItems(
                                                        new KeyToPathBuilder()
                                                                .withKey("privateKey")
                                                                .withPath("id_rsa")
                                                                .build(),
                                                        new KeyToPathBuilder()
                                                                .withKey("publicKey")
                                                                .withPath("id_rsa.pub")
                                                                .build())
                                                .build())
                                        .build()

                        )
                        .addNewContainer()
                        .withName(name)
                        .withImage(divaClient.getImage())
                        .addNewVolumeMount()
                        .withName(pvc.getMetadata().getName())
                        .withMountPath("/wi")
                        .endVolumeMount()
                        .addNewVolumeMount()
                        .withName("diva-client-ssh")
                        .withMountPath("/root/.ssh/id_rsa")
                        .withSubPath("id_rsa")
                        .endVolumeMount()
                        .addNewVolumeMount()
                        .withName("diva-client-ssh")
                        .withMountPath("/root/.ssh/id_rsa.pub")
                        .withSubPath("id_rsa.pub")
                        .endVolumeMount()
                        .endContainer()
                        .endSpec()
                        .build())
                .withVolumeClaimTemplates(pvc)
                .endSpec()
                .build();
        StatefulSet existingSts = client.apps().statefulSets()
                .inNamespace(namespace)
                .withName(stsName)
                .get();
        client.resource(desiredSts).serverSideApply();
        return existingSts;
    }

问题2:在变更Diva资源的状态信息时,资源冲突

在Operator创建Diva相关资源,同时变更Diva的状态时,比如我需要实时更新客户端的ready个数,manger是否ready。就需要去watch相关的sts和deployment,以订阅到相关的事件从而更新状态,如果直接交给DivaReconciler 硬编写去更新时,时长报出资源冲突。解决方法是:在一定时间段内的变更,通过Map汇总到一起,启一个更新状态的线程,读取Map的Diva的变更状态需求,然后统一进行一次状态变更,如此减少变更次数,从而减少资源冲突。如下:

public class DivaStatusUpdateThread extends Thread {
    private final KubernetesClient client;
    private long lastUpdateTime = 0L;
    private final Map<DivaRef, Long> needUpdates = new ConcurrentHashMap<>();

    public void addDivaRef(DivaRef divaRef) {
        this.needUpdates.put(divaRef, System.currentTimeMillis());
    }

    public DivaStatusUpdateThread(KubernetesClient client) {
        this.client = client;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (needUpdates != null && !needUpdates.isEmpty()) {

                    List<DivaRef> refs = needUpdates.entrySet().stream()
                        .filter(entry -> entry.getValue() != null && entry.getValue() > lastUpdateTime)
                        .map(Map.Entry::getKey).collect(Collectors.toList());
                    if(!refs.isEmpty()) {
                        lastUpdateTime = System.currentTimeMillis();
                        refs.forEach(this::updateStatus);
                    }
                }
                Thread.sleep(5000); 
            }
        } catch (Exception ignored) {
        }
    }
}

监控K8S资源变更事件:

 	client.resource(sts).watch(new Watcher<StatefulSet>() {
            @Override
            public void eventReceived(Action action, StatefulSet resource) {
                log.info("diva client {} {} in namespace {}", action, resource.getMetadata().getName(), resource.getMetadata().getNamespace());
                updateStatus(diva);
            }

            @Override
            public void onClose(WatcherException cause) {
            }
        });

最后可以通过 kubectl get diva 观察到状态变化,如下图中,operator帮助我们创建了两个diva的相关资源:
测试

总结

这是一次比较综合运用k8s资源的开发尝试,让我更深入的了解到K8S运作原理及运维,operator确实是通过自己的资源的编排从而达到简化原本繁琐的部署资源申请,更直观的观察到自己部署资源的状态。
diva的存在只是为了适应原本公司的版本发布过程。现在看来所有的功能都可以通过operator来实现。让operator来连接外部资源,并更新到k8s容器中都是比较简单。但是diva从另一方面简化了CI的负担,例如开发人员构建出的成果可直接通过diva部署到k8s中去,无需打包成镜像及容器编排,更方面的进行应用的运维,从而和k8s的运维很好的隔离开来。一想到这些,发现diva还是有存在下去的价值的,因为作为开发构建者了解k8s环境、打通客户的k8s网络可能都比较难,至少在我们公司是这样,diva存在价值,那运维人员就存在价值。

#K8S(1)#运维(1)

文章作者:1900

文章链接:https://zhuty.cn/archives/k8s-operator-impl

版权声明:本博客所有文章除特别声明外,均采用CC BY-NC-SA 4.0 许可协议,转载请注明出处!


评论