1. 环境说明

Kubernetes源码版本:remotes/origin/release-1.25
Kubernetes编译出来的Kubelet版本:Kubernetes v1.24.0-beta.0.2463+ee7799bab469d7
Kubernetes集群实验环境:使用Kubernetes v1.25.4二进制的方式搭建了一个单节点集群

Golang版本:go1.19.3 linux/amd64
IDEA版本:2022.2.3
Delve版本:1.9.1

[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# dlv version
Delve Debugger
Version: 1.9.1
Build: $Id: d81b9fd12bfa603f3cf7a4bc842398bd61c42940 $
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# go version
go version go1.19.3 linux/amd64
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl version
WARNING: This version information is deprecated and will be replaced with the output from kubectl version --short.  Use --output=yaml|json to get the full version.
Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:36:36Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:29:58Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get nodes -owide
NAME          STATUS   ROLES    AGE   VERSION   INTERNAL-IP     EXTERNAL-IP   OS-IMAGE                KERNEL-VERSION                CONTAINER-RUNTIME
k8s-master1   Ready    <none>   31h   v1.25.4   192.168.11.71   <none>        CentOS Linux 7 (Core)   3.10.0-1160.80.1.el7.x86_64   containerd://1.6.10
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get componentstatus
Warning: v1 ComponentStatus is deprecated in v1.19+
NAME                 STATUS    MESSAGE                         ERROR
etcd-0               Healthy   {"health":"true","reason":""}
controller-manager   Healthy   ok
scheduler            Healthy   ok
[root@k8s-master1 kubernetes]#

Kubelet启动参数配置如下:

[root@k8s-master1 kubernetes]# ps -ef|grep "/usr/local/bin/kubelet"
root       7972      1  6 07:06 ?        00:00:06 /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.kubeconfig --kubeconfig=/etc/kubernetes/kubelet.kubeconfig --config=/etc/kubernetes/kubelet-conf.yml --container-runtime-endpoint=unix:///run/containerd/containerd.sock --node-labels=node.kubernetes.io/node= --v=8
root       9549   6424  0 07:07 pts/0    00:00:00 grep --color=auto /usr/local/bin/kubelet
[root@k8s-master1 kubernetes]#

Kubelet参数配置如下:

apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
address: 0.0.0.0
port: 10250
readOnlyPort: 10255
authentication:
  anonymous:
    enabled: false
  webhook:
    cacheTTL: 2m0s
    enabled: true
  x509:
    clientCAFile: /etc/kubernetes/pki/ca.pem
authorization:
  mode: Webhook
  webhook:
    cacheAuthorizedTTL: 5m0s
    cacheUnauthorizedTTL: 30s
cgroupDriver: systemd
cgroupsPerQOS: true
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true
cpuManagerPolicy: none
cpuManagerReconcilePeriod: 10s
enableControllerAttachDetach: true
enableDebuggingHandlers: true
enforceNodeAllocatable:
- pods
eventBurst: 10
eventRecordQPS: 5
evictionHard:
  imagefs.available: 15%
  memory.available: 100Mi
  nodefs.available: 10%
  nodefs.inodesFree: 5%
evictionPressureTransitionPeriod: 5m0s
failSwapOn: true
fileCheckFrequency: 20s
hairpinMode: promiscuous-bridge
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 20s
imageGCHighThresholdPercent: 85
imageGCLowThresholdPercent: 80
imageMinimumGCAge: 2m0s
iptablesDropBit: 15
iptablesMasqueradeBit: 14
kubeAPIBurst: 10
kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1
registryBurst: 10
registryPullQPS: 5
resolvConf: /etc/resolv.conf
rotateCertificates: true
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
volumeStatsAggPeriod: 1m0s

2. 源码分析

syncLoop函数并不长,实际上也没有干啥事,重点逻辑都放在了syncLoopIteration

// pkg/kubelet/kubelet.go


func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.InfoS("Starting kubelet main sync loop")
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	// 这里的housekeeping英文翻译过来是家务管理的意思
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()

    // 看到这里是不是顿然开悟,PLEG生产的PodLifecycleEvent最终落到了syncLoop这里
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base

	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
		kl.dnsConfigurer.CheckLimitsForResolvConf()
	}

    // 下面就是一个死循环了,对得起syncLoop中的Loop
	for {
	    // 如果运行时报错,就跳出Pod同步
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.ErrorS(err, "Skipping pod synchronization")
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

2.1. syncLoopIteration

syncLoopIteration函数具体逻辑如下,乍一眼看过去,差点被唬住了,这里面居然一次性监听了7Channel,我们来一个一个的说。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.ErrorS(nil, "Kubelet does not support snapshot update")
		default:
			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
		}

		kl.sourcesReady.AddSource(u.Source)

	case e := <-plegCh:
		if e.Type == pleg.ContainerStarted {
			kl.lastContainerStartedTime.Add(e.ID, time.Now())
		}
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
			}
		}

		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
	case <-syncCh:
		// Sync pods waiting for sync
		podsToSync := kl.getPodsToSync()
		if len(podsToSync) == 0 {
			break
		}
		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
		handler.HandlePodSyncs(podsToSync)
	case update := <-kl.livenessManager.Updates():
		if update.Result == proberesults.Failure {
			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
		}
	case update := <-kl.readinessManager.Updates():
		ready := update.Result == proberesults.Success
		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

		status := ""
		if ready {
			status = "ready"
		}
		handleProbeSync(kl, update, handler, "readiness", status)
	case update := <-kl.startupManager.Updates():
		started := update.Result == proberesults.Success
		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

		status := "unhealthy"
		if started {
			status = "started"
		}
		handleProbeSync(kl, update, handler, "startup", status)
	case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() {
			// If the sources aren't ready or volume manager has not yet synced the states,
			// skip housekeeping, as we may accidentally delete pods from unready sources.
			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
		} else {
			start := time.Now()
			klog.V(4).InfoS("SyncLoop (housekeeping)")
			if err := handler.HandlePodCleanups(); err != nil {
				klog.ErrorS(err, "Failed cleaning pods")
			}
			duration := time.Since(start)
			if duration > housekeepingWarningDuration {
				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
			}
			klog.V(4).InfoS("SyncLoop (housekeeping) end")
		}
	}
	return true
}

2.1.1. configCh

首先是configCh,实际上这玩意就是PodConfig.updates,最终会被PodStorage.Merge()以及PodStorage.Sync()这两兄弟改写。

PodStorage中存储的Pod有三种来源,分别是Http, File, Api,顾名思义,Http肯定来资源网络,在kubelet的配置文件中,可以通过StaticPodURLHeader来配置。我们并没有配置这一项。File则来文件,也就是StaticPod的来源,在kubelet的配置文件中,通过StaticPodPath来配置,我们配置为/etc/kubernetes/manifests。虽然我们配置了这个目录,但是这个目录是空的。而Api则来自kube-apiserver,我们通过kubectl或者通过client-go对于所有Pod的更改都会在这里体现。

因此,一旦我们使用Kubectl创建、修改或者删除PodconfigCh都会收到。

根据PodUpdate的操作类型不同,通过SyncHandler来处理,通过分析SyncHnadler的源码,实际上SyncHandler是通过PodWorker来处理的

// pkg/kubelet/kubelet.go

switch u.Op {
case kubetypes.ADD:
	klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
	handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
	klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
	handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
	klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
	handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
	klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
	handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
	klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
	// DELETE is treated as a UPDATE because of graceful deletion.
	handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
	// TODO: Do we want to support this?
	klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
	klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}

2.1.2. plegCh

通过Kubelet 1.25.x源码——PLEG这篇博客,我们知道,PLEG实际上就是不停的对比前后两个事件点的Pod得出的状态,只要发现前后两个时间点,同一个Pod的两个状态不同,就会向eventChannel发送PodLifecycleEvent。后这里的plegCh就是eventChannel,实际上就是新瓶装旧酒。

// pkg/kubelet/kubelet.go

if e.Type == pleg.ContainerStarted {
	kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
	// PLEG event for a pod; sync it.
	if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
		klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
		handler.HandlePodSyncs([]*v1.Pod{pod})
	} else {
		// If the pod no longer exists, ignore the event.
		klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
	}
}

if e.Type == pleg.ContainerDied {
	if containerID, ok := e.Data.(string); ok {
		kl.cleanUpContainersInPod(e.ID, containerID)
	}
}

2.1.3. syncCh

2.1.4. livenessManager.Updates()

2.1.5. readinessManager.Updates()

2.1.6. startupManager.Updates()

2.1.7. housekeepingCh

2.2. PodUpdate

这里我们来看看PodUpate,不知道大家有没有注意到,进入到syncLoop函数的时候,首先引入眼帘的就是它的参数,直觉告诉我,这两个参数还是比较重要的。

向上追溯可以发现,PodUpate是通过PodCfg.Updates()传进来的

// cmd/kubelet/app/server.go

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

    // 此处省略其它逻辑
}

点进去,我们发现PodCfg.Updates()方法就是把自己内部的updates属性传了出来

// pkg/kubelet/config/config.go

func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
	return c.updates
}

那么,这个属性是在什么地方、什么时候、被谁写入的呢?,我们带着这个问题继续往下追,按住IDEActrl + G,找到所有写入updates这个属性的地方。

搜索结果让人很开心,就一个地方写入,也就是创建PodConfig的时候写入的,但是创建PodConfig也是通过updates := make(chan kubetypes.PodUpdate, 50)创建了一个空的channel啊。

Kubelet v1.25.x源码——syncLoop-LMLPHP

用脚趾头想,这个channel一定在某个地方被使用,我们仔细看一下,storage := newPodStorage(updates, mode, recorder),其实在创建PodStorage的时候也传入了updates,也就是说PodConfig.updates属性一定是通过PodStrorage来改变的,我们继续通过IDEA搜索来看看PodStorage是如何使用这个PodConfig.updates的。

居然有如此之多的地方在修改PodConfig.updates属性,PodStorage主要有两块在修改PodConfig.updates,一块是在Merge,一块是在Sync的时候。看来分析PodStorage是势在必行了,我们一起来看看PodStorage干了啥

Kubelet v1.25.x源码——syncLoop-LMLPHP

2.2.1. PodStorage

顾名思义,PodStorage势必是Pod的缓存,在没有看到接口定义之前,我猜测内部应该是使用了一个Map,其中keyPod IDValuePod。然而当我们真正看到PodStorage的时候,还是傻了眼,因为我猜测的这个Map居然只是一个Value,真正的Key看注释是Source。看到这里,似乎可以猜测,因该是做了Pod的分类,每个Pod可能的来源不同。

那么,究竟有哪些Source呢?这些Source下的Pod又有何不同?,我们继续借助IDEA向上搜索。(突然不由得感慨一下,选择一个趁手的工具真的很重要,IDEA阅读代码真的很顺畅,VsCode还是差了些,尤其是向上搜索的能力不够,当然,也可能是我用的不熟练)

// pkg/kubelet/config/config.go

type podStorage struct {
	podLock sync.RWMutex
    // 果然没有让我们失望,对得起Storage, 这里用了一个Map存放Pod, 按照我的猜测本来应该就是一个简单的Map,key为PodId,值为Pod,但是这里多嵌套了一层
	pods map[string]map[types.UID]*v1.Pod
	mode PodConfigNotificationMode

	updateLock sync.Mutex
	// 这个udpates属性,实际上就是PodConfig.updates,他们两个持有相同的引用
	updates    chan<- kubetypes.PodUpdate

	sourcesSeenLock sync.RWMutex
	sourcesSeen     sets.String
	recorder record.EventRecorder
}

终于,经过几次的搜索,还是让我找到了不同Source的定义,如下所示。Pod的来源一共有三种,分别是File, Http, Api

// pkg/kubelet/types/pod_update.go

const (
	// Filesource idenitified updates from a file.
	FileSource = "file"
	// HTTPSource identifies updates from querying a web page.
	HTTPSource = "http"
	// ApiserverSource identifies updates from Kubernetes API Server.
	ApiserverSource = "api"
	// AllSource identifies updates from all sources.
	AllSource = "*"
)

Kubelet v1.25.x源码——总体概览这篇博客当中,我们曾经看到过创建PodSourceConfig的影子,只不过当时并不知道这是干嘛的,也没有过多的注意。老铁们如果忘记了,可以再过一遍Kubelet整个启动流程。我们继续看看PodSourceConfig的具体逻辑

这里的逻辑并不复杂,就和PodSourceConfig命名一样,仅仅是配置Pod不同的来源,我们可以配置的来源有两个选项,分别是StaticPodURLHeader, StaticPodPath,不知道老铁们有没有看我我交代的环境说明那一节,里面贴出了Kubelet的配置,其中就是有StaticPodPath,并且我们配置的值为/etc/kubernetes/manifests。熟悉Kubernetes的小伙伴肯定很清楚,这其实就是所谓的StaticPodkubelet启动完成之后,会把StaticPodPath所指定路径下的所有资源清单全部Apply一遍。而对于StaticPodURLHeader,这个属于HTTP SOURCE,暂时还不知道这个东西的使用场景。

至于Api Source,看了代码显然不难理解,其实就是来自于kube-apiserver的Pod,这种来源的Pod才是我们重点关注的对象。

func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

	// TODO:  it needs to be replaced by a proper context in the future
	ctx := context.TODO()

	// define file config source
	if kubeCfg.StaticPodPath != "" {
		klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
	}

	// define url config source
	if kubeCfg.StaticPodURL != "" {
		klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
	}

	if kubeDeps.KubeClient != nil {
		klog.InfoS("Adding apiserver pod source")
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
	}
	return cfg, nil
}

2.2.1.1. Merge

// pkg/kubelet/config/config.go

func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles := s.merge(source, change)
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	case PodConfigNotificationIncremental:
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}
		if len(adds.Pods) > 0 {
			s.updates <- *adds
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}
		if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
			// Send an empty update when first seeing the source and there are
			// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
			// the source is ready.
			s.updates <- *adds
		}
		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
		if len(reconciles.Pods) > 0 {
			s.updates <- *reconciles
		}

	case PodConfigNotificationSnapshotAndUpdates:
		if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
			s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}

	case PodConfigNotificationSnapshot:
		if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
			s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
		}

	case PodConfigNotificationUnknown:
		fallthrough
	default:
		panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
	}

	return nil
}
// pkg/kubelet/config/config.go

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
	s.podLock.Lock()
	defer s.podLock.Unlock()

	addPods := []*v1.Pod{}
	updatePods := []*v1.Pod{}
	deletePods := []*v1.Pod{}
	removePods := []*v1.Pod{}
	reconcilePods := []*v1.Pod{}

	pods := s.pods[source]
	if pods == nil {
		pods = make(map[types.UID]*v1.Pod)
	}

	updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
		filtered := filterInvalidPods(newPods, source, s.recorder)
		for _, ref := range filtered {
			// Annotate the pod with the source before any comparison.
			if ref.Annotations == nil {
				ref.Annotations = make(map[string]string)
			}
			ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
			if existing, found := oldPods[ref.UID]; found {
				pods[ref.UID] = existing
				needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
				if needUpdate {
					updatePods = append(updatePods, existing)
				} else if needReconcile {
					reconcilePods = append(reconcilePods, existing)
				} else if needGracefulDelete {
					deletePods = append(deletePods, existing)
				}
				continue
			}
			recordFirstSeenTime(ref)
			pods[ref.UID] = ref
			addPods = append(addPods, ref)
		}
	}

	update := change.(kubetypes.PodUpdate)
	switch update.Op {
	case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
		if update.Op == kubetypes.ADD {
			klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
		} else if update.Op == kubetypes.DELETE {
			klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
		} else {
			klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
		}
		updatePodsFunc(update.Pods, pods, pods)

	case kubetypes.REMOVE:
		klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
		for _, value := range update.Pods {
			if existing, found := pods[value.UID]; found {
				// this is a delete
				delete(pods, value.UID)
				removePods = append(removePods, existing)
				continue
			}
			// this is a no-op
		}

	case kubetypes.SET:
		klog.V(4).InfoS("Setting pods for source", "source", source)
		s.markSourceSet(source)
		// Clear the old map entries by just creating a new map
		oldPods := pods
		pods = make(map[types.UID]*v1.Pod)
		updatePodsFunc(update.Pods, oldPods, pods)
		for uid, existing := range oldPods {
			if _, found := pods[uid]; !found {
				// this is a delete
				removePods = append(removePods, existing)
			}
		}

	default:
		klog.InfoS("Received invalid update type", "type", update)

	}

	s.pods[source] = pods

	adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
	updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
	deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
	removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
	reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

	return adds, updates, deletes, removes, reconciles
}

2.2.1.2. Sync

// pkg/kubelet/config/config.go

// Sync sends a copy of the current state through the update channel.
func (s *podStorage) Sync() {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()
	s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
}

2.3. SyncHandler

这里,我们一起看一下SyncHandler,这个接口也是即为重要,syncLoop中监听的7channel对于Pod的修改都是通过SyncHandler来实现的。

SyncHandler一共定义了六个接口,前三个接口不难理解,分别是对于Pod的增加、修改、删除,后面三个看名字暂时还不清楚干了啥。

// pkg/kubelet/kubelet.go

type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups() error
}

2.3.1. HandlePodAdditions

具体逻辑如下:

  • 1、对于新添加的Pod按照创建事件先后排序,显然,先创建的先被处理,很合情合理
  • 2、通过PodManager获取所有的Pod
  • 3、把当前需要创建的Pod添加到PodManager
  • 4、判断当前需要创建的Pod是否是MirrorPod,如果是MirrorPod,按照MirrorPod流程处理
    • 那么,什么是MirrorPod, 它有什么用,和普通的Pod有啥不同
  • 5、如果不是MirrorPod,通过PodWorker判断当前需要创建的Pod是否正在请求结束
  • 6、把当前要创建的Pod交给PodWorker来处理,至于后续PodWorker是怎么处理这个Pod的,我后面再写文章来介绍
// pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()

		kl.podManager.AddPod(pod)~~

		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}

		// we simply avoid doing any work.
		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
			activePods := kl.filterOutInactivePods(existingPods)

			// Check if we can admit the pod; if not, reject it.
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

2.3.2. HandlePodUpdates

HandlerPodAdditions类似,首先更新了PodManager,然后判断当前Pod是否是MirrorPod,如果是MirrorPod,那么就按照MirrorPod的处理流程处理。如果不是同样交给PodWorker来处理

// pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		kl.podManager.UpdatePod(pod)
		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
	}
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

2.3.3. HandlePodRemoves

还是一样的配方,一样的味道。先在PodManager中删除此Pod,然后判断是否是MirrorPod,最后交给PodWorker来处理

// pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		kl.podManager.DeletePod(pod)
		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}
		// Deletion is allowed to fail because the periodic cleanup routine
		// will trigger deletion again.
		if err := kl.deletePod(pod); err != nil {
			klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
		}
	}
}

func (kl *Kubelet) deletePod(pod *v1.Pod) error {
	if pod == nil {
		return fmt.Errorf("deletePod does not allow nil pod")
	}
	if !kl.sourcesReady.AllReady() {
		// If the sources aren't ready, skip deletion, as we may accidentally delete pods
		// for sources that haven't reported yet.
		return fmt.Errorf("skipping delete because sources aren't ready yet")
	}
	klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		UpdateType: kubetypes.SyncPodKill,
	})
	// We leave the volume/directory cleanup to the periodic cleanup routine.
	return nil
}

2.3.4. HandlePodReconcile

先更新PodManager,然后判断当前Pod是否需要调谐,交给PodWorker来处理

// pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		// Update the pod in pod manager, status manager will do periodically reconcile according
		// to the pod manager.
		kl.podManager.UpdatePod(pod)

		// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
		if status.NeedToReconcilePodReadiness(pod) {
			mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
			kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
		}

		// After an evicted pod is synced, all dead containers in the pod can be removed.
		if eviction.PodIsEvicted(pod.Status) {
			if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
				kl.containerDeletor.deleteContainersInPod("", podStatus, true)
			}
		}
	}
}

func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
	if len(pod.Spec.ReadinessGates) == 0 {
		return false
	}
	podReadyCondition := GeneratePodReadyCondition(&pod.Spec, pod.Status.Conditions, pod.Status.ContainerStatuses, pod.Status.Phase)
	i, curCondition := podutil.GetPodConditionFromList(pod.Status.Conditions, v1.PodReady)
	// Only reconcile if "Ready" condition is present and Status or Message is not expected
	if i >= 0 && (curCondition.Status != podReadyCondition.Status || curCondition.Message != podReadyCondition.Message) {
		return true
	}
	return false
}


func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

2.3.5. HandlePodSyncs

// pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
	}
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

2.3.6. HandlePodCleanups

逻辑还是不少,看累了,后面再分析吧

// pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodCleanups() error {
	var (
		cgroupPods map[types.UID]cm.CgroupName
		err        error
	)
	if kl.cgroupsPerQOS {
		pcm := kl.containerManager.NewPodContainerManager()
		cgroupPods, err = pcm.GetAllPodsFromCgroups()
		if err != nil {
			return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)
		}
	}

	allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
	klog.V(3).InfoS("Clean up pod workers for terminated pods")
	workingPods := kl.podWorkers.SyncKnownPods(allPods)

	allPodsByUID := make(map[types.UID]*v1.Pod)
	for _, pod := range allPods {
		allPodsByUID[pod.UID] = pod
	}

	runningPods := make(map[types.UID]sets.Empty)
	possiblyRunningPods := make(map[types.UID]sets.Empty)
	restartablePods := make(map[types.UID]sets.Empty)
	for uid, sync := range workingPods {
		switch sync {
		case SyncPod:
			runningPods[uid] = struct{}{}
			possiblyRunningPods[uid] = struct{}{}
		case TerminatingPod:
			possiblyRunningPods[uid] = struct{}{}
		case TerminatedAndRecreatedPod:
			restartablePods[uid] = struct{}{}
		}
	}

	// Stop probing pods that are not running
	klog.V(3).InfoS("Clean up probes for terminated pods")
	kl.probeManager.CleanupPods(possiblyRunningPods)

	// Terminate any pods that are observed in the runtime but not
	// present in the list of known running pods from config.
	runningRuntimePods, err := kl.runtimeCache.GetPods()
	if err != nil {
		klog.ErrorS(err, "Error listing containers")
		return err
	}
	for _, runningPod := range runningRuntimePods {
		switch workerState, ok := workingPods[runningPod.ID]; {
		case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
			// if the pod worker is already in charge of this pod, we don't need to do anything
			continue
		default:
			if _, ok := allPodsByUID[runningPod.ID]; !ok {
				klog.V(3).InfoS("Clean up orphaned pod containers", "podUID", runningPod.ID)
				one := int64(1)
				kl.podWorkers.UpdatePod(UpdatePodOptions{
					UpdateType: kubetypes.SyncPodKill,
					RunningPod: runningPod,
					KillPodOptions: &KillPodOptions{
						PodTerminationGracePeriodSecondsOverride: &one,
					},
				})
			}
		}
	}

	// Remove orphaned pod statuses not in the total list of known config pods
	klog.V(3).InfoS("Clean up orphaned pod statuses")
	kl.removeOrphanedPodStatuses(allPods, mirrorPods)
	runningRuntimePods, err = kl.containerRuntime.GetPods(false)
	if err != nil {
		klog.ErrorS(err, "Error listing containers")
		return err
	}

	// Remove orphaned pod user namespace allocations (if any).
	klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
	if err = kl.usernsManager.CleanupOrphanedPodUsernsAllocations(allPods, runningRuntimePods); err != nil {
		klog.ErrorS(err, "Failed cleaning up orphaned pod user namespaces allocations")
	}

	klog.V(3).InfoS("Clean up orphaned pod directories")
	err = kl.cleanupOrphanedPodDirs(allPods, runningRuntimePods)
	if err != nil {
		klog.ErrorS(err, "Failed cleaning up orphaned pod directories")
	}

	// Remove any orphaned mirror pods (mirror pods are tracked by name via the
	// pod worker)
	klog.V(3).InfoS("Clean up orphaned mirror pods")
	kl.deleteOrphanedMirrorPods()

	if kl.cgroupsPerQOS {
		pcm := kl.containerManager.NewPodContainerManager()
		klog.V(3).InfoS("Clean up orphaned pod cgroups")
		kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, possiblyRunningPods)
	}

	kl.backOff.GC()

	for uid := range restartablePods {
		pod, ok := allPodsByUID[uid]
		if !ok {
			continue
		}
		if kl.isAdmittedPodTerminal(pod) {
			klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
			continue
		}
		start := kl.clock.Now()
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}

	return nil
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}
12-08 22:14