kubelet源码分析(四)之 syncLoopIteration

以下代码分析基于 kubernetes v1.12.0 版本。


1. syncLoop

syncLoop是处理变更的循环。 它监听来自三种channel(file,apiserver和http)的更改。 对于看到的任何新更改,将针对所需状态和运行状态运行同步。 如果没有看到配置的变化,将在每个同步频率秒同步最后已知的所需状态。


// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    glog.Info("Starting kubelet main sync loop.")
    // The resyncTicker wakes up kubelet to checks if there are any pod workers
    // that need to be sync'd. A one-second period is sufficient because the
    // sync interval is defaulted to 10s.
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    plegCh := kl.pleg.Watch()
    const (
        base   = 100 * time.Millisecond
        max    = 5 * time.Second
        factor = 2
    duration := base
    for {
        if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
            glog.Infof("skipping pod synchronization - %v", rs)
            // exponential backoff
            duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
        // reset backoff if we have a success
        duration = base

        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {


2. syncLoopIteration



  • configCh:将配置更改的pod分派给事件类型的相应处理程序回调。
  • plegCh:更新runtime缓存,同步pod。
  • syncCh:同步所有等待同步的pod。
  • houseKeepingCh:触发清理pod。
  • livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。


2.1. configCh


func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            glog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false

        switch u.Op {
        case kubetypes.ADD:
            glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
        case kubetypes.UPDATE:
            glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
        case kubetypes.REMOVE:
            glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
        case kubetypes.RECONCILE:
            glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
        case kubetypes.DELETE:
            glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
            // DELETE is treated as a UPDATE because of graceful deletion.
        case kubetypes.RESTORE:
            glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
            // These are pods restored from the checkpoint. Treat them as new
            // pods.
        case kubetypes.SET:
            // TODO: Do we want to support this?
            glog.Errorf("Kubelet does not support snapshot update")


  • ADD:HandlePodAdditions
  • UPDATE:HandlePodUpdates
  • REMOVE:HandlePodRemoves
  • RECONCILE:HandlePodReconcile
  • DELETE:HandlePodUpdates
  • RESTORE:HandlePodAdditions
  • podsToSync:HandlePodSyncs


2.2. plegCh


case e := <-plegCh:
    if isSyncPodWorthy(e) {
        // PLEG event for a pod; sync it.
        if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
            glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
        } else {
            // If the pod no longer exists, ignore the event.
            glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)

    if e.Type == pleg.ContainerDied {
        if containerID, ok := e.Data.(string); ok {
            kl.cleanUpContainersInPod(e.ID, containerID)

2.3. syncCh


case <-syncCh:
    // Sync pods waiting for sync
    podsToSync := kl.getPodsToSync()
    if len(podsToSync) == 0 {
    glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))

2.4. livenessManager.Update


case update := <-kl.livenessManager.Updates():
    if update.Result == proberesults.Failure {
        // The liveness manager detected a failure; sync the pod.

        // We should not use the pod from livenessManager, because it is never updated after
        // initialization.
        pod, ok := kl.podManager.GetPodByUID(update.PodUID)
        if !ok {
            // If the pod no longer exists, ignore the update.
            glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
        glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))

2.5. housekeepingCh


case <-housekeepingCh:
    if !kl.sourcesReady.AllReady() {
        // If the sources aren't ready or volume manager has not yet synced the states,
        // skip housekeeping, as we may accidentally delete pods from unready sources.
        glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
    } else {
        glog.V(4).Infof("SyncLoop (housekeeping)")
        if err := handler.HandlePodCleanups(); err != nil {
            glog.Errorf("Failed cleaning pods: %v", err)

3. SyncHandler


// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
    HandlePodAdditions(pods []*v1.Pod)
    HandlePodUpdates(pods []*v1.Pod)
    HandlePodRemoves(pods []*v1.Pod)
    HandlePodReconcile(pods []*v1.Pod)
    HandlePodSyncs(pods []*v1.Pod)
    HandlePodCleanups() error


3.1. HandlePodAdditions


// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {

将pod添加到pod manager中。

for _, pod := range pods {
    // Responsible for checking limits in resolv.conf
    if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
    existingPods := kl.podManager.GetPods()
    // Always add the pod to the pod manager. Kubelet relies on the pod
    // manager as the source of truth for the desired state. If a pod does
    // not exist in the pod manager, it means that it has been deleted in
    // the apiserver and no action (other than cleanup) is required.

如果是mirror pod,则对mirror pod进行处理。

if kubepod.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)


if !kl.podIsTerminated(pod) {
    // Only go through the admission process if the pod is not
    // terminated.

    // We failed pods that we rejected, so activePods include all admitted
    // pods that are alive.
    activePods := kl.filterOutTerminatedPods(existingPods)

    // Check if we can admit the pod; if not, reject it.
    if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
        kl.rejectPod(pod, reason, message)

执行dispatchWork函数,该函数是syncHandler中调用到的核心函数,该函数在pod worker中启动一个异步循环,来分派pod的相关操作。该函数的具体操作待后续分析。

mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)

最后加pod添加到probe manager中。


3.2. HandlePodUpdates


// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {

将pod更新到pod manager中。

for _, pod := range pods {
    // Responsible for checking limits in resolv.conf
    if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {

如果是mirror pod,则对mirror pod进行处理。

if kubepod.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)


// TODO: Evaluate if we need to validate and reject updates.

mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)

3.3. HandlePodRemoves


// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {

从pod manager中删除pod。

for _, pod := range pods {

如果是mirror pod,则对mirror pod进行处理。

if kubepod.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)


// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
    glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)

deletePod 函数将需要删除的pod加入podKillingCh的channel中,有podKiller监听这个channel去执行删除任务,实现如下:

// deletePod deletes the pod from the internal state of the kubelet by:
// 1.  stopping the associated pod worker asynchronously
// 2.  signaling to kill the pod by sending on the podKillingCh channel
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(pod *v1.Pod) error {
    if pod == nil {
        return fmt.Errorf("deletePod does not allow nil pod")
    if !kl.sourcesReady.AllReady() {
        // If the sources aren't ready, skip deletion, as we may accidentally delete pods
        // for sources that haven't reported yet.
        return fmt.Errorf("skipping delete because sources aren't ready yet")

    // Runtime cache may not have been updated to with the pod, but it's okay
    // because the periodic cleanup routine will attempt to delete again later.
    runningPods, err := kl.runtimeCache.GetPods()
    if err != nil {
        return fmt.Errorf("error listing containers: %v", err)
    runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
    if runningPod.IsEmpty() {
        return fmt.Errorf("pod not found")
    podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}

    kl.podKillingCh <- &podPair
    // TODO: delete the mirror pod here?

    // We leave the volume/directory cleanup to the periodic cleanup routine.
    return nil

从probe manager中移除pod。


3.4. HandlePodReconcile


// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {

将pod更新到pod manager中。

for _, pod := range pods {
    // Update the pod in pod manager, status manager will do periodically reconcile according
    // to the pod manager.


// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
if status.NeedToReconcilePodReadiness(pod) {
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)


// After an evicted pod is synced, all dead containers in the pod can be removed.
if eviction.PodIsEvicted(pod.Status) {
    if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
        kl.containerDeletor.deleteContainersInPod("", podStatus, true)

3.5. HandlePodSyncs

HandlePodSyncssyncHandler接口回调函数,调用dispatchWork,通过pod worker来执行任务。

// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)

3.6. HandlePodCleanups



// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
func (kl *Kubelet) HandlePodCleanups() error {
    // The kubelet lacks checkpointing, so we need to introspect the set of pods
    // in the cgroup tree prior to inspecting the set of pods in our pod manager.
    // this ensures our view of the cgroup tree does not mistakenly observe pods
    // that are added after the fact...
    var (
        cgroupPods map[types.UID]cm.CgroupName
        err        error
    if kl.cgroupsPerQOS {
        pcm := kl.containerManager.NewPodContainerManager()
        cgroupPods, err = pcm.GetAllPodsFromCgroups()
        if err != nil {
            return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)

列出所有pod包括mirror pod。

allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
//   1. If the containers were removed immediately after they died, kubelet
//      may fail to generate correct statuses, let alone filtering correctly.
//   2. If kubelet restarted before writing the terminated status for a pod
//      to the apiserver, it could still restart the terminated pod (even
//      though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
activePods := kl.filterOutTerminatedPods(allPods)

desiredPods := make(map[types.UID]empty)
for _, pod := range activePods {
    desiredPods[pod.UID] = empty{}

pod worker停止不再存在的pod的任务,并从probe manager中清除pod。

// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?


runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
    glog.Errorf("Error listing containers: %#v", err)
    return err
for _, pod := range runningPods {
    if _, found := desiredPods[pod.ID]; !found {
        kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}


kl.removeOrphanedPodStatuses(allPods, mirrorPods)

移除所有的orphaned volume。

// Remove any orphaned volumes.
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove volumes associated with terminated but not yet
// deleted pods.
err = kl.cleanupOrphanedPodDirs(allPods, runningPods)
if err != nil {
    // We want all cleanup tasks to be run even if one of them failed. So
    // we just log an error here and continue other cleanup tasks.
    // This also applies to the other clean up tasks.
    glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)

移除mirror pod。

// Remove any orphaned mirror pods.


// Remove any cgroups in the hierarchy for pods that are no longer running.
if kl.cgroupsPerQOS {
    kl.cleanupOrphanedPodCgroups(cgroupPods, activePods)



4. dispatchWork

dispatchWork通过pod worker启动一个异步的循环。


// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    if kl.podIsTerminated(pod) {
        if pod.DeletionTimestamp != nil {
            // If the pod is in a terminated state, there is no pod worker to
            // handle the work item. Check if the DeletionTimestamp has been
            // set, and force a status update to trigger a pod deletion request
            // to the apiserver.
    // Run the sync in an async worker.
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
            if err != nil {
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {



执行pod worker的UpdatePod函数,该函数是pod worker的核心函数,来执行pod相关操作。具体逻辑待下文分析。

// Run the sync in an async worker.
    Pod:        pod,
    MirrorPod:  mirrorPod,
    UpdateType: syncType,
    OnCompleteFunc: func(err error) {
        if err != nil {


// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {

5. PodWorkers.UpdatePod


// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
    UpdatePod(options *UpdatePodOptions)
    ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
    ForgetWorker(uid types.UID)



// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    pod := options.Pod
    uid := pod.UID
    var podUpdates chan UpdatePodOptions
    var exists bool

    defer p.podLock.Unlock()
    if podUpdates, exists = p.podUpdates[uid]; !exists {
        // We need to have a buffer here, because checkForUpdates() method that
        // puts an update into channel is called from the same goroutine where
        // the channel is consumed. However, it is guaranteed that in such case
        // the channel is empty, so buffer of size 1 is enough.
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        // Creating a new pod worker either means this is a new pod, or that the
        // kubelet just restarted. In either case the kubelet is willing to believe
        // the status of the pod for the first pod worker sync. See corresponding
        // comment in syncPod.
        go func() {
            defer runtime.HandleCrash()
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } else {
        // if a request to kill a pod is pending, we do not let anything overwrite that request.
        update, found := p.lastUndeliveredWorkUpdate[pod.UID]
        if !found || update.UpdateType != kubetypes.SyncPodKill {
            p.lastUndeliveredWorkUpdate[pod.UID] = *options

6. managePodLoop


// newPodWorkers传入syncPod函数
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)


func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
    resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
    return &podWorkers{
        podUpdates:                map[types.UID]chan UpdatePodOptions{},
        isWorking:                 map[types.UID]bool{},
        lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
        syncPodFn:                 syncPodFn,  // 构造传入klet.syncPod函数
        recorder:                  recorder,
        workQueue:                 workQueue,
        resyncInterval:            resyncInterval,
        backOffPeriod:             backOffPeriod,
        podCache:                  podCache,



func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    var lastSyncTime time.Time
    for update := range podUpdates {
        err := func() error {
            podUID := update.Pod.UID
            // This is a blocking call that would return only if the cache
            // has an entry for the pod that is newer than minRuntimeCache
            // Time. This ensures the worker doesn't start syncing until
            // after the cache is at least newer than the finished time of
            // the previous sync.
            status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
            if err != nil {
                // This is the legacy event thrown by manage pod loop
                // all other events are now dispatched from syncPodFn
                p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
                return err
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            lastSyncTime = time.Now()
            return err
        // notify the call-back function if the operation succeeded or not
        if update.OnCompleteFunc != nil {
        if err != nil {
            // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
            glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
        p.wrapUp(update.Pod.UID, err)

7. 总结


  1. 通过几种channel来对不同类型的事件进行监听并处理。其中channel包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()
  2. 不同的SyncHandler执行不同的增删改查操作。
  3. 其中HandlePodAdditionsHandlePodUpdatesHandlePodReconcileHandlePodSyncs都调用到了dispatchWork来执行pod的相关操作。HandlePodCleanups的pod清理任务,通过channel的方式加需要清理的pod给podKiller来清理。
  4. dispatchWork调用podWorkers.UpdatePod执行异步操作。
  5. podWorkers.UpdatePod中调用managePodLoop来执行pod相关操作循环。


