Atlantis
GitHub Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

2. kubectl输出格式化

之前在给一线的驻场同学做工具时,需要格式化输出一些 Pod 关键信息,比如 Pod 状态、最近启动时间等等:

alt text

当时想参考 kubectl 的代码实现,结果没找到,仔细检查后才发现格式化输出与 Pod 状态转换逻辑都是在 kube-apiserver 里。

以获取 pod 列表为例,kubectl get pod 命令输出的 STATUS 字段,实际上是在服务端计算完成的,当请求头中若存在 as=Table;v=v1;g=meta.k8s.io,application/json;as=Table;v=v1beta1;g=meta.k8s.io,application/json,kube-apiserver 响应数据时就会以表格的形式返回:

alt text

我们可以在 Kubernetes 的 pkg/printers 中找到展示数据的逻辑:

alt text

printtPod 函数中就有我们需要的状态转换逻辑:

alt text

搭配 controller-runtime 的 client 包,我们就可以做一些与 kubectl 输出结果一致的命令行工具。

下面是获取集群中所有租户的 deployment 关联的 Pod 信息的例子:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"path/filepath"
	"sort"
	"strings"
	"time"

	"github.com/jedib0t/go-pretty/v6/table"
	"github.com/pkg/errors"
	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/clientcmd"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

func main() {
	// 初始化context,设定一个60秒超时的总时长
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
	defer cancel()
	// 获取一个访问kube-apiserver的client
	c, err := getClient(ctx)
	if err != nil {
		log.Printf("get client error: %s\n", err)
		return
	}
	// 获取所有的deployment
	deployList := &appsv1.DeploymentList{}
	if err := c.List(ctx, deployList); err != nil {
		log.Printf("list deployment error: %s\n", err)
		return
	}
	// 遍历deployment,筛选出租户命名空间下的deployment
	items := []*appsv1.Deployment{}
	for i := range deployList.Items {
		item := &deployList.Items[i]
		if strings.HasPrefix(item.Namespace, "tenant") && strings.Contains(item.Name, "cks") {
			items = append(items, item)
		}
	}
	// 按命名空间和名称排序
	sort.Slice(items, func(i, j int) bool {
		if items[i].Namespace != items[j].Namespace {
			return items[i].Namespace < items[j].Namespace
		}
		return items[i].Name < items[j].Name
	})
	log.Printf("***打印Pod启动10分钟后出现过重启的容器信息***\n")
	fmt.Println()
	// 遍历deployment列表
	for _, deploy := range items {
		log.Printf("deployment: %s/%s \n", deploy.Namespace, deploy.Name)
		// 使用dployment的selector过滤出pod
		podList := &corev1.PodList{}
		if err := c.List(ctx, podList, client.InNamespace(deploy.Namespace), client.MatchingLabels(deploy.Spec.Selector.MatchLabels)); err != nil {
			log.Printf("list pod error: %s\n", err)
			return
		}
		// 根据pod名称排序
		sort.Slice(podList.Items, func(i, j int) bool {
			return podList.Items[i].Name < podList.Items[j].Name
		})
		// 遍历pod列表,检查Pod是否需要打印(pod启动10分钟后未ready或发生过重启)
		for _, pod := range podList.Items {
			if ok := checkPod(&pod); ok {
				log.Printf("pod: %s/%s status: normal\n", pod.Namespace, pod.Name)
				continue
			}
			log.Printf("pod: %s/%s status: error\n", pod.Namespace, pod.Name)
			printPod(&pod)
		}
		fmt.Println()
	}
}

func getClient(ctx context.Context) (client.Client, error) {
	home, err := os.UserHomeDir()
	if err != nil {
		return nil, errors.Wrap(err, "get home dir")
	}
	kubeconfigPath := filepath.Join(home, ".kube", "config")
	data, err := os.ReadFile(kubeconfigPath)
	if err != nil {
		return nil, errors.Wrap(err, "read kubeconfig")
	}
	config, err := clientcmd.RESTConfigFromKubeConfig(data)
	if err != nil {
		return nil, errors.Wrap(err, "parse kubeconfig")
	}
	c, err := client.New(config, client.Options{})
	if err != nil {
		return nil, errors.Wrap(err, "initialize kubeClient")
	}
	return c, nil
}

func checkPod(pod *corev1.Pod) bool {
	var ready bool
	for _, v := range pod.Status.Conditions {
		if v.Type == corev1.PodReady && v.Status == corev1.ConditionTrue {
			ready = true
			break
		}
	}
	if !ready {
		return false
	}
	timeoutTimestamp := pod.CreationTimestamp.Add(time.Minute * 10)
	for _, v := range pod.Status.ContainerStatuses {
		if v.State.Waiting != nil || v.State.Terminated != nil {
			return false
		}
		if v.State.Running != nil && v.State.Running.StartedAt.After(timeoutTimestamp) {
			return false
		}
	}
	return true
}

func printPod(pod *corev1.Pod) {
	sort.Slice(pod.Status.ContainerStatuses, func(i, j int) bool {
		return pod.Status.ContainerStatuses[i].Name < pod.Status.ContainerStatuses[j].Name
	})
	total := 0
	t := table.NewWriter()
	t.AppendHeader(table.Row{"名称", "ready", "最近启动时间", "上次退出时间", "上次退出原因", "上次退出码", "重启次数"})
	recentCntrName := "null"
	recentCntrTime := time.Time{}
	timeoutTimestamp := pod.CreationTimestamp.Add(time.Minute * 10)
	for _, v := range pod.Status.ContainerStatuses {
		if v.RestartCount == 0 {
			continue
		}
		total += int(v.RestartCount)
		var startedAt = "null"
		if v.State.Running != nil {
			if !v.State.Running.StartedAt.After(timeoutTimestamp) {
				continue
			}
			startedAt = v.State.Running.StartedAt.Format("2006-01-02 15:04:05")
		}
		var lastStateFinishedAt = "null"
		var lastStateReason = "null"
		var lastStateExitCode = "null"
		if v.LastTerminationState.Terminated != nil {
			lastStateFinishedAt = v.LastTerminationState.Terminated.FinishedAt.Format("2006-01-02 15:04:05")
			lastStateReason = v.LastTerminationState.Terminated.Reason
			lastStateExitCode = fmt.Sprintf("%d", v.LastTerminationState.Terminated.ExitCode)
			if v.LastTerminationState.Terminated.FinishedAt.After(recentCntrTime) {
				recentCntrName = v.Name
				recentCntrTime = v.LastTerminationState.Terminated.FinishedAt.Time
			}
		}
		t.AppendRow(table.Row{
			v.Name,
			v.Ready,
			startedAt,
			lastStateFinishedAt,
			lastStateReason,
			lastStateExitCode,
			v.RestartCount,
		})
		t.AppendSeparator()
	}
	t.SetTitle("创建时间: %s => 重启次数: %d => 最近重启容器: %s @ %s", pod.CreationTimestamp.Format("2006-01-02 15:04:05"), total, recentCntrName, recentCntrTime.Format("2006-01-02 15:04:05"))
	fmt.Println(t.Render())
}