文章目录
  1. 1. 架构图
  2. 2. 构成
    1. 2.1. 控制平面(control plane)
      1. 2.1.1. 内部组件构成
    2. 2.2. 计算设备(compute machines)
      1. 2.2.1. worker节点内部构成
  3. 3. 源码解析
    1. 3.1. kube-controller-manager 及 control loop
      1. 3.1.1. 涉及源码包
  4. 4. 参考文档

架构图

kubernetes architecture

kubernetes official - components

构成

控制平面(control plane)

又称Master节点,kubernetes控制核心。

内部组件构成

  1. kube-apiserver
    kubernetes 集群API 服务,所有与kubernetes集群交互都需要通过API进行操作,API服务器会对请求进行校验,请求有效就进行处理。访问API的方式有 (1. REST API调用 2. kubectl 命令行 3. 其他命令行工具,例如kubeadm)
  2. kube-scheduler
    kubernetes 调度程序,负责根据集群的资源需求及集群运行状况将容器调度安排到合适的节点。
  3. kube-controller-manager
    kubernetes 管理多种控制器(Deployment, ReplicaSet等)的管理组件。
  4. etcd
    集群配置数据及集群状态的信息存储
  5. cloud-controller-manager(可选项)
    嵌入到特定云提供商的控制逻辑中的控制平面组件

计算设备(compute machines)

又称节点, worker 节点。

worker节点内部构成

  1. kubelet
    负责和控制平面(control plane, master节点)进行通信,并确保调度到该节点的pod中容器正常运行。
  2. kube-proxy
    负责处理集群内部、外部网络通信。
  3. Container runtime
    支持容器可以在节点上正常运行,kubernetes支持符合OCI标准的容器运行时,例如 rkt 和 CRI-O 以及docker(dockershim)

源码解析

kube-controller-manager 及 control loop

涉及源码包

  • kubernetes/cmd/kube-controller-manager.go
1
2
3
4
5
6
7
8
9
10
11
12
func main() {

// ......

command := app.NewControllerManagerCommand()

// ......

if err := command.Execute(); err != nil {
os.Exit(1)
}
}
  • kubernetes/cmd/kube-controller-manager/app/controllermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func NewControllerManagerCommand() *cobra.Command {
// ......

cmd := &cobra.Command{
Use: "kube-controller-manager",
Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In Kubernetes, a controller is a control loop that watches the shared
state of the cluster through the apiserver and makes changes attempting to move the
current state towards the desired state. Examples of controllers that ship with
Kubernetes today are the replication controller, endpoints controller, namespace
controller, and serviceaccounts controller.`,
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-controller-manager generically watches APIs (including deprecated ones),
// and CI ensures it works properly against matching kube-apiserver versions.
restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
return nil
},
Run: func(cmd *cobra.Command, args []string) {
// ......

if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}

//......

return cmd
}
  • kubernetes/cmd/kube-controller-manager/app/controllermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {

//......

run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {

controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}

// initializer all controller
controllerInitializers := initializersFunc(controllerContext.LoopMode)
if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}

controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)

select {}
}

// .......
}
  • kubernetes/cmd/kube-controller-manager/app/controllermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
// TODO: volume controller into the IncludeCloudLoops only set.
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
controllers["storage-version-gc"] = startStorageVersionGCController
}

return controllers
}
  • kubernetes/cmd/kube-controller-manager/app/apps.go
1
2
3
4
5
6
7
8
9
10
11
12
13
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
  • kubernetes/pkg/controller/deployment/deployment_controller.go
1
2
3
4
5
6
7
8
9
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
// ......

for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}

<-stopCh
}
  • kubernetes/pkg/controller/deployment/deployment_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}

func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)

err := dc.syncHandler(key.(string))
dc.handleErr(err, key)

return true
}
1
2
3
4
5
6
7
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ......

dc.syncHandler = dc.syncDeployment
// ......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}

startTime := time.Now()
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
defer func() {
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
}()

deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()

everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
}
return nil
}

// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}

if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}

// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(d); err != nil {
return err
}

if d.Spec.Paused {
return dc.sync(d, rsList)
}

// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(d, rsList)
}

scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}

switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)

// Scale up, if we can.
//
// -----------------------------------------
// Reconcile new ReplicaSet
// -----------------------------------------
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}

// Scale down, if we can.
//
// -----------------------------------------
// Reconcile old ReplicaSet
// -----------------------------------------
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}

if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}

// Sync deployment status
return dc.syncRolloutStatus(allRSs, newRS, d)
}

func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
// Scaling not required.
return false, nil
}
if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
// Scale down.
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
return scaled, err
}
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
if err != nil {
return false, err
}
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
return scaled, err
}

func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// Can't scale down further
return false, nil
}

allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// increase unavailability.
// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
//
// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
// step(that will increase unavailability).
//
// Concrete example:
//
// * 10 replicas
// * 2 maxUnavailable (absolute number, not percent)
// * 3 maxSurge (absolute number, not percent)
//
// case 1:
// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
// * The new replica set pods crashloop and never become available.
// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
// * The user notices the crashloop and does kubectl rollout undo to rollback.
// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
//
// case 2:
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new replica set to be scaled up by 5.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
if maxScaledDown <= 0 {
return false, nil
}

// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
if err != nil {
return false, nil
}
klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)

// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
allRSs = append(oldRSs, newRS)
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
if err != nil {
return false, nil
}
klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)

totalScaledDown := cleanupCount + scaledDownCount
return totalScaledDown > 0, nil
}

参考文档

文章目录
  1. 1. 架构图
  2. 2. 构成
    1. 2.1. 控制平面(control plane)
      1. 2.1.1. 内部组件构成
    2. 2.2. 计算设备(compute machines)
      1. 2.2.1. worker节点内部构成
  3. 3. 源码解析
    1. 3.1. kube-controller-manager 及 control loop
      1. 3.1.1. 涉及源码包
  4. 4. 参考文档