你好,我是孔令飞。
Kubernetes 的基石是声明式编程,模式如下:

开发者通过调用 kube-apiserver 向 etcd 中添加/修改/删除资源,kube-apiserver 会保存资源的定义,资源定义中会声明资源期望的状态。kube-controller-mananger 通过 List-Watch 机制感知到 kube-apiserver 中资源的变更,会根据资源的定义进行(期望的状态)调谐。所谓的调谐,就是运行程序将资源的状态维持在期望的状态。
课程的前半部分,我详细介绍了 kube-apiserver 中的核心内容。接下来将介绍声明式编程中另外一个核心角色:控制器 kube-controller-manager。
kube-controller-manager 的功能
kube-controller-manager 是 Kubernetes 主控节点(Master Node)上的一个关键组件,它负责运行核心控制循环,这些控制循环处理集群范围内的各种功能。kube-controller-manager 通过监听 API Server 的变化来触发相应的控制器逻辑,从而实现对整个集群的管理和自动化操作。
kube-controller-manager 中聚合了多个控制器,这些控制器可以在 names 包中找到。控制器的具体实现位于 pkg/controller/ 目录下。
kube-controller-manager 启动源码解析
kube-controller-manager 应用的启动框架前面已经详细介绍过,这节课我们来重点看看跟 kube-controller-manager 功能关系比较紧密的核心启动代码。
kube-controller-manager 的启动代码位于 Run 方法中。在 Run 方法中,首先会启动事件记录器:
// Run runs the KubeControllerManagerOptions.
func Run(ctx context.Context, c *config.CompletedConfig) error {
...
// Start events processing pipeline.
// 初始化事件广播器,并开始以结构化的格式记录事件。参数 0 表示不添加额外的信息(可以调整日志级别)
c.EventBroadcaster.StartStructuredLogging(0)
// 启动事件录制,将事件发送到 Kubernetes API,特别是 Events 接口
// &v1core.EventSinkImpl{...} 创建了一个用于接收事件的 EventSinkImpl 对象,从而使事件可以通过 API 客户端发送到 Kubernetes
c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
// 使用 defer 语句,在当前函数运行结束时触发 Shutdown() 方法,以确保在程序退出时干净地关闭事件广播器
defer c.EventBroadcaster.Shutdown()
...
}
接下来会创建 2 个 ClientBuilder:
-
rootClientBuilder:提供一种基础方式去创建具有根级权限的 Kubernetes 客户端。 -
clientBuilder:根据是否启用了服务账户凭证有不同的行为。如果启用了,它会尝试使用服务账户的身份进行更安全的操作;如果没有启用,则直接复用rootClientBuilder的行为模式。
之后定义一个 run 方法,run 方法代码如下:
run := func(ctx context.Context, controllerDescriptors map[string]*ControllerDescriptor) {
controllerContext, err := CreateControllerContext(ctx, c, rootClientBuilder, clientBuilder)
if err != nil {
logger.Error(err, "Error building controller context")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if err := StartControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler); err != nil {
logger.Error(err, "Error starting controllers")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
<-ctx.Done()
}
run 方法用来创建一个控制器上下文,控制器上下文中包括了控制器启动时需要的各种信息。之后,基于该控制器上下文及 controllerDescriptors 变量来启动注册的控制器。
创建控制器上下文
通过 CreateControllerContext 方法来创建一个控制器上下文实例。实例类型为 ControllerContext,定义如下:
type ControllerContext struct { // ClientBuilder will provide a client for this controller to use
// 提供一个用于与 Kubernetes API 交互的客户端。控制器可以使用此客户端执行 // 与集群中各种资源相关的操作,例如创建、更新、查询等。这允许控制器灵活地与资源交互
ClientBuilder clientbuilder.ControllerClientBuilder
// 提供访问共享的 informer。informer 用于监控 Kubernets API 中的资源变化。通过 informer,
// 控制器可以获得资源状态的更新,并对其作出反应,保持其内部状态与集群状态的一致性
InformerFactory informers.SharedInformerFactory
// 提供对某些特定资源的 informer 访问。这些资源可以是类型化的(具有确定类型)或动态的(无需在编译时知道类型,但需要提供资源的 metadata 信息)。
ObjectOrMetadataInformerFactory informerfactory.InformerFactory
// 提供控制器的初始化选项访问。这个配置一般包含控制器的运行参数,控制器行为的各种布置。
// 例如,它可能包含控制器的同步周期、并发工作线程数等参数
ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration
// 这是一个延迟初始化的 RESTMapper,用于在需要时确定资源的 REST 端点。它只有在第一次请求时才初始化 REST 映射, // 减少了启动时对 API Server 的依赖,提高了性能
RESTMapper *restmapper.DeferredDiscoveryRESTMapper
// 这是一个通道,当所有控制器被初始化并开始运行时将被关闭。使用此通道,控制器可以在信号到达时开始使用共享的 informer。
// 这管理了控制器之间的启动顺序,保证在开始使用 informer 之前所有控制器都已就绪
InformersStarted chan struct{}
// ResyncPeriod 每次被调用时生成一个持续时间;这样可以避免多个控制器在同步时步调一致,
// 同时向 API Server 发送大量的列表请求。
ResyncPeriod func() time.Duration
// 提供一个代理,用于设置与控制器特定的指标
ControllerManagerMetrics *controllersmetrics.ControllerManagerMetrics
// 提供对垃圾收集器依赖图构建的访问。它负责跟踪集群中的资源,以及它们之间的依赖关系。在 Kubernetes 中, // 垃圾收集器用于管理资源的生命周期,确保不再需要的资源得到清理
GraphBuilder *garbagecollector.GraphBuilder
}
CreateControllerContext 函数实现代码如下:
func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder) (ControllerContext, error) {
// Informer transform to trim ManagedFields for memory efficiency.
trim := func(obj interface{}) (interface{}, error) {
if accessor, err := meta.Accessor(obj); err == nil {
if accessor.GetManagedFields() != nil {
accessor.SetManagedFields(nil)
}
}
return obj, nil
}
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim))
metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
metadataInformers := metadatainformer.NewSharedInformerFactoryWithOptions(metadataClient, ResyncPeriod(s)(), metadatainformer.WithTransform(trim))
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
// Use a discovery client capable of being refreshed.
discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, ctx.Done())
controllerContext := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics(kubeControllerManager),
}
if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector &&
controllerContext.IsControllerEnabled(NewControllerDescriptors()[names.GarbageCollectorController]) {
ignoredResources := make(map[schema.GroupResource]struct{})
for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
}
controllerContext.GraphBuilder = garbagecollector.NewDependencyGraphBuilder(
ctx,
metadataClient,
controllerContext.RESTMapper,
ignoredResources,
controllerContext.ObjectOrMetadataInformerFactory,
controllerContext.InformersStarted,
)
}
controllersmetrics.Register()
return controllerContext, nil
}
上述代码首先定义了一个 trim 函数,从资源对象中获取资源的 metav1.ObjectMeta 信息,返回的是 metav1.Object 接口类型。然后,调用 metav1.Object 接口的 SetManagedFields 方法将 ManagedFields 设为 nil,这可以减少内存占用,因为 ManagedFields 信息是不必要的。
接下来,分别创建版本化客户端、共享 informer、元数据客户端和元数据 informer,并调用 WaitForAPIServer 函数,等待 kube-apiserver 成功运行。WaitForAPIServer 函数中通过轮询的方式调用 kube-apiserver 的 /healthz 接口,来判断 kube-apiserver 是否健康。
接着创建动态 RESTMapper,用来构建动态资源的 REST 请求路径。
最后,判断 kube-controller-manager 是否开启了 GC。如果配置启用了垃圾收集器,并且 garbage-collector-controller 控制器被启用,则创建用于构建资源依赖图的 GraphBuilder,并将被忽略的资源映射到一个集合中,传递给依赖图构建器以进行垃圾收集的管理。
CreateControllerContext 函数最后返回 ControllerContext 类型的结构体实例。结构体实例中的各个字段,用来在后面的程序中启动各个控制器。
创建控制器集合
要想启动控制器,首先需要获取需要启动的控制器列表。在 kube-controller-manager 中是通过 NewControllerDescriptors 函数来构建一个控制器集合的。控制器集合是一个 map 类型的实例,其中 key 是控制器的名称,value 中包含了控制器的一些核心参数。这些参数用来初始化和启动控制器。
NewControllerDescriptors 函数代码如下:
func NewControllerDescriptors() map[string]*ControllerDescriptor {
controllers := map[string]*ControllerDescriptor{}
aliases := sets.NewString()
// All the controllers must fulfil common constraints, or else we will explode.
register := func(controllerDesc *ControllerDescriptor) {
if controllerDesc == nil {
panic("received nil controller for a registration")
}
name := controllerDesc.Name()
if len(name) == 0 {
panic("received controller without a name for a registration")
}
if _, found := controllers[name]; found {
panic(fmt.Sprintf("controller name %q was registered twice", name))
}
if controllerDesc.GetInitFunc() == nil {
panic(fmt.Sprintf("controller %q does not have an init function", name))
}
for _, alias := range controllerDesc.GetAliases() {
if aliases.Has(alias) {
panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
}
aliases.Insert(alias)
}
controllers[name] = controllerDesc
}
// First add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// in the main controller loop initialization, so we add them here only for the metadata and duplication detection.
// app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers
// The only known special case is the ServiceAccountTokenController which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new
// special controllers.
register(newServiceAccountTokenControllerDescriptor(nil))
...
for _, alias := range aliases.UnsortedList() {
if _, ok := controllers[alias]; ok {
panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
}
}
return controllers
}
上述代码先定义了一个 register 函数,用来注册一系列控制器。该函数接收一个 *ControllerDescriptor 类型的参数,ControllerDescriptor 结构体定义如下:
type ControllerDescriptor struct {
name string
initFunc InitFunc
requiredFeatureGates []featuregate.Feature
aliases []string
isDisabledByDefault bool
isCloudProviderController bool
requiresSpecialHandling bool
}
接下来对 *ControllerDescriptor 结构体中的字段进行一些校验:
-
name、initFunc字段不能为空。如果为空,控制器无法正常启动。 -
NewControllerDescriptors函数中会注册多个控制器,这些控制器不能重名,也不能有重复的 alias。
register 方法中校验 ControllerDescriptor 通过后,会将控制器注册到 map[string]*ControllerDescriptor{} 类型的变量 controllers 中。
NewControllerDescriptors 方法中会多次调用 register 函数,用来注册多个控制器。这些控制器都会以 key-value 的形式保存在 controllers map 类型的变量中并返回。
注册的控制器有很多,详细请参考 register(newServiceAccountTokenControllerDescriptor(nil)) 部分代码块。
我们可以看到,在调用 register 函数注册控制器时,*ControllerDescriptor 类型的实例是通过 newXXXControllerDescriptor 函数来创建的。我们来看其中一个 newXXXControllerDescriptor 实现:
func newDaemonSetControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.DaemonSetController,
aliases: []string{"daemonset"},
initFunc: startDaemonSetController,
}
}
在 newDaemonSetControllerDescriptor 函数中,指定了控制器的名称 names.DaemonSetController。kube-controller-manager 中有很多控制器,为了方便统一维护,将所有的控制器名称统一定义在 k8s.io/kubernetes/cmd/kube-controller-manager/names 包中。所以,如果我们想知道 kube-controller-manager 支持哪些控制器,直接查看 names 包即可。
newDaemonSetControllerDescriptor 函数通过 startDaemonSetController 函数来创建并启动一个控制器。startDaemonSetController 代码如下:
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
dsc, err := daemon.NewDaemonSetsController(
ctx,
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
return nil, true, nil
}
在 startDaemonSetController 函数中,通过 daemon.NewDaemonSetsController 函数来创建一个具体的控制器实例。daemon 包的导入路径为 k8s.io/kubernetes/pkg/controller/daemon。由此可见,kube-controller-manager 将控制器的具体实现放在了 pkg/controller/ 目录下。
创建的 DaemonSet 控制器实例为 dsc,之后通过调用 go dsc.Run 来启动 DaemonSet 控制器。至于 kube-controller-manager 控制器的具体实现,我们下一节课会讲到。
启动控制器
在有了控制器上下文和控制器集合之后,就可以启动这些控制器了。run 函数中是通过 StartControllers 函数来启动这些控制器的。
StartControllers 函数是 kube-controller-manager 中一个非常重要的函数,其代码实现如下:
func StartControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
var controllerChecks []healthz.HealthChecker
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok {
check, err := StartController(ctx, controllerCtx, serviceAccountTokenControllerDescriptor, unsecuredMux)
if err != nil {
return err
}
if check != nil {
// HealthChecker should be present when controller has started
controllerChecks = append(controllerChecks, check)
}
}
// Each controller is passed a context where the logger has the name of
// the controller set through WithName. That name then becomes the prefix of
// of all log messages emitted by that controller.
//
// In StartController, an explicit "controller" key is used instead, for two reasons:
// - while contextual logging is alpha, klog.LoggerWithName is still a no-op,
// so we cannot rely on it yet to add the name
// - it allows distinguishing between log entries emitted by the controller
// and those emitted for it - this is a bit debatable and could be revised.
for _, controllerDesc := range controllerDescriptors {
if controllerDesc.RequiresSpecialHandling() {
continue
}
check, err := StartController(ctx, controllerCtx, controllerDesc, unsecuredMux)
if err != nil {
return err
}
if check != nil {
// HealthChecker should be present when controller has started
controllerChecks = append(controllerChecks, check)
}
}
healthzHandler.AddHealthChecker(controllerChecks...)
return nil
}
上述代码其实是遍历控制器集合 controllerDescriptors 获取其中的每个控制器,并根据该控制器 ControllerDescriptor 中的字段值来启动控制器。
启动控制器是通过 StartController 函数实现的,StartController 函数中会执行以下核心操作:
-
检查控制器是否通过 FeatureGate 开启,如果禁止启动,则跳过该控制器的启动。
-
判断该控制器是否是云提供商相关的控制器。在 Kubernetes 中,云提供商控制器负责与底层云基础设施进行交互,例如管理负载均衡器、路由规则等。如果是,则忽略。
-
判断控制器是否开启,如果没有开启,则跳过启动(这里的控制器不是通过 FeatureGate)。
-
通过
time.Sleep等待一段时间,再启动控制器。通过这个时间间隔,防止同时启动控制器,给 kube-apiserver 带来压力。 -
使用
ControllerDescriptor中的initFunc来启动控制器。
启动 Informer
kube-controller-manager 在 CreateControllerContext 函数中分别创建了共享 informer 和元数据 informer。接下来还需要通过以下代码来启动上述 2 个 informer:
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
启动完成之后,通过 close(controllerContext.InformersStarted) 来告知 Informer 已经启动成功。
启动领导者选举
kube-controller-manager 为了实现多副本容灾,会同时启动多个实例。但是 kube-controller-manager 的执行是有状态的。为了确保执行的正确性,kube-controller-manager 通过领导者选举,来确保同一时间只有一个 kube-controller-manager 实例在运行。
在 Run 方法中,创建了控制器启动函数 run 之后,就可以基于 run 函数来启动领导者选举的主逻辑。kube-controller-manager 的领导者选举源码实现,我们不再深入讲解,如果你感兴趣可以课后自行阅读、学习。
课程总结
本节课我们介绍了 kube-controller-manager 的核心启动逻辑。
首先,创建控制器启动上下文,用来启动 kube-controller-manager 中注册的各种控制器。接着创建了一个控制器集合,在创建控制器集合的过程中,可以指定控制器的初始化和启动方法。然后运行控制器,这些控制器会通过 List-Watch 的方式跟 kube-apiserver 保持连接,持续获取关注事件的变更,并触发调谐动作。这些调谐动作会根据资源的定义参数执行预定的逻辑。最后,启动共享 Informer,从 kube-apiserver 中获取事件用于调谐。
课后练习
-
阅读 kube-controller-manager 源码,了解 kube-controller-manager 是如何启用领导者选举的。
-
请你介绍下,kube-controller-manager 在多副本启动时,副本的实例 ID 如何生成的?
欢迎你在留言区与我交流讨论,如果今天的内容让你有所收获,也欢迎转发给有需要的朋友,我们下节课再见!
精选留言
2025-06-30 14:59:58
2. Hostname + UUID