Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的
更新:HHH   时间:2023-1-7


Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

ResoureQuota介绍

关于ResoureQuota和ResourceController的介绍和使用请参见如下官方文档。这是你理解这篇博客的基础。

  • https://kubernetes.io/docs/admin/resourcequota/

  • https://kubernetes.io/docs/admin/resourcequota/walkthrough/

  • https://kubernetes.io/docs/user-guide/compute-resources/

  • https://kubernetes.io/docs/admin/admission-controllers/

  • https://github.com/kubernetes/community/blob/master/contributors/design-proposals/admission_control_resource_quota.md

ResourceQuota Controller源码目录结构分析

ResourceQuota Controller作为Kubernetes Controller Manager管理的众多Controller中的一员,其主要的源码位于目录k8s.io/kubernetes/pkg/quotak8s.io/kubernetes/pkg/controller/resourcequota,具体分析如下:

k8s.io/kubernetes/pkg/quota
.
├── evaluator    // 负责各种资源使用的统计
│   └── core
│       ├── configmap.go   // ConfigMapEvaluator的实现,负责ConfigMap资源的统计
│       ├── doc.go
│       ├── persistent_volume_claims.go    // PVCEvaluator的实现,负责PVC资源的统计
│       ├── persistent_volume_claims_test.go
│       ├── pods.go    //PodEvaluator的实现,负责Pod资源的统计
│       ├── pods_test.go
│       ├── registry.go    // 创建Registry时注册所有的Evaluators
│       ├── replication_controllers.go    // RCEvaluator的实现,负责ReplicationController资源的统计
│       ├── resource_quotas.go    // ResourceQuotaEvaluator的实现,负责ResourceQuota资源的统计
│       ├── secrets.go    // SecretEvaluator的实现,负责Secret资源的统计
│       ├── services.go    // ServiceEvaluator的实现,负责Service资源的统计
│       └── services_test.go
├── generic    // genericEvaluator的定义和实现
│   ├── evaluator.go    // 实现了genericEvaluator的接口,包括最重要的CalculateUsageStats接口
│   └── registry.go    // 定义GenericRegistry
├── install
│   └── registry.go    // 定义了startResourceQuotaController时会调用创建ResourceQuota Registry的方法
├── interfaces.go    // 定义了Registry和Evaluator Interface
├── resources.go    // 定义Resources的集合操作以及CalculateUsage方法
└── resources_test.go
k8s.io/kubernetes/pkg/controller/resourcequota
.
├── doc.go
├── replenishment_controller.go    // 定义replenishmentControllerFactory,用来创建replenishmentController
├── replenishment_controller_test.go
├── resource_quota_controller.go    // 定义ResourceQuotaController及其Run方法,syncResourceQuota方法等,属于核心文件。
└── resource_quota_controller_test.go

ResourceQuota Controller内部实现原理图

请下载到本地放大查看。

具体各个模块的功能和交互请看下面的源码分析。

ResourceQuota Controller源码分析

上面的内部实现原理图显示,ResourceQuotaController是Kubenetes Controller Manager启动进行初始化众多Controllers的时候,通过调用startResourceQuotaController来完成ResourceQuotaController的启动。

###从kube-controller-manager的startResourceQuotaController开始

cmd/kube-controller-manager/app/core.go:76

func startResourceQuotaController(ctx ControllerContext) (bool, error) {
	resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
	resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
	
	// 定义ReplenishmentController需要监控的资源对象
	groupKindsToReplenish := []schema.GroupKind{
		api.Kind("Pod"),
		api.Kind("Service"),
		api.Kind("ReplicationController"),
		api.Kind("PersistentVolumeClaim"),
		api.Kind("Secret"),
		api.Kind("ConfigMap"),
	}
	
	...
	
	go resourcequotacontroller.NewResourceQuotaController(
		resourceQuotaControllerOptions,
	).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
	return true, nil
}

startResourceQuotaController启动一个goroutine,通过NewResourceQuotaController创建一个ResourceQuotaController并执行其Run方法开始提供ResourceQuotaController。

下面是ResourceQuotaController和ResourceQuotaControllerOptions结构体的定义。ResourceQuotaController中定义了几个关键Entity,分别是rqController、queue、missingUsageQueue、registry、replenishmentControllers,在上一节中的原理图中也能看到它们的身影。

###ResourceQuotaController定义

pkg/controller/resourcequota/resource_quota_controller.go:40

// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
	// Must have authority to list all quotas, and update quota status
	KubeClient clientset.Interface
	// Controls full recalculation of quota usage
	ResyncPeriod controller.ResyncPeriodFunc
	// Knows how to calculate usage
	Registry quota.Registry
	// Knows how to build controllers that notify replenishment events
	ControllerFactory ReplenishmentControllerFactory
	// Controls full resync of objects monitored for replenihsment.
	ReplenishmentResyncPeriod controller.ResyncPeriodFunc
	// List of GroupKind objects that should be monitored for replenishment at
	// a faster frequency than the quota controller recalculation interval
	GroupKindsToReplenish []schema.GroupKind
}

// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
	// Must have authority to list all resources in the system, and update quota status
	kubeClient clientset.Interface
	// An index of resource quota objects by namespace
	rqIndexer cache.Indexer
	// Watches changes to all resource quota
	rqController *cache.Controller
	// ResourceQuota objects that need to be synchronized
	queue workqueue.RateLimitingInterface
	// missingUsageQueue holds objects that are missing the initial usage informatino
	missingUsageQueue workqueue.RateLimitingInterface
	// To allow injection of syncUsage for testing.
	syncHandler func(key string) error
	// function that controls full recalculation of quota usage
	resyncPeriod controller.ResyncPeriodFunc
	// knows how to calculate usage
	registry quota.Registry
	// controllers monitoring to notify for replenishment
	replenishmentControllers []cache.ControllerInterface
}

NewRegistry

接下来,我们看看startResourceQuotaController调用的NewRegistry、NewResourceQuotaController以及ResourceQuotaController的Run方法。

pkg/quota/evaluator/core/registry.go:29

// NewRegistry returns a registry that knows how to deal with core kubernetes resources
// If an informer factory is provided, evaluators will use them.
func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry {
	pod := NewPodEvaluator(kubeClient, f)
	service := NewServiceEvaluator(kubeClient)
	replicationController := NewReplicationControllerEvaluator(kubeClient)
	resourceQuota := NewResourceQuotaEvaluator(kubeClient)
	secret := NewSecretEvaluator(kubeClient)
	configMap := NewConfigMapEvaluator(kubeClient)
	persistentVolumeClaim := NewPersistentVolumeClaimEvaluator(kubeClient, f)
	return &generic.GenericRegistry{
		InternalEvaluators: map[schema.GroupKind]quota.Evaluator{
			pod.GroupKind():                   pod,
			service.GroupKind():               service,
			replicationController.GroupKind(): replicationController,
			secret.GroupKind():                secret,
			configMap.GroupKind():             configMap,
			resourceQuota.GroupKind():         resourceQuota,
			persistentVolumeClaim.GroupKind(): persistentVolumeClaim,
		},
	}
}

可见,NewRegistry负责这些资源对象(pod,service,rc,secret,configMap,resourceQuota,PVC)的的Evaluator的创建和注册,供后面Worker中执行quota.CalculateUsage(...)对这些资源对象进行使用统计。

NewResourceQuotaController

NewRegistry执行完后,开始创建ResourceQuotaController,代码如下。

pkg/controller/resourcequota/resource_quota_controller.go:78

func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
	// build the resource quota controller
	rq := &ResourceQuotaController{
		kubeClient:               options.KubeClient,
		queue:                    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
		missingUsageQueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
		resyncPeriod:             options.ResyncPeriod,
		registry:                 options.Registry,
		replenishmentControllers: []cache.ControllerInterface{},
	}
	
	...
	
	// set the synchronization handler
	rq.syncHandler = rq.syncResourceQuotaFromKey

	// build the controller that observes quota
	rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).List(options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.ResourceQuota{},
		rq.resyncPeriod(),
		cache.ResourceEventHandlerFuncs{
			AddFunc: rq.addQuota,
			UpdateFunc: func(old, cur interface{}) {
				oldResourceQuota := old.(*v1.ResourceQuota)
				curResourceQuota := cur.(*v1.ResourceQuota)
				if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
					return
				}
				rq.addQuota(curResourceQuota)
			},
			DeleteFunc: rq.enqueueResourceQuota,
		},
		cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
	)

	for _, groupKindToReplenish := range options.GroupKindsToReplenish {
		controllerOptions := &ReplenishmentControllerOptions{
			GroupKind:         groupKindToReplenish,
			ResyncPeriod:      options.ReplenishmentResyncPeriod,
			ReplenishmentFunc: rq.replenishQuota,
		}
		replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
		if err != nil {
			glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
		} else {
			rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
		}
	}
	return rq
}

NewResourceQuotaController负责创建ResourceQuotaController,包括queue, missingUsageQueue, syncHandler,rqIndexer, rqController,replenishmentControllers的Entity填充。重点关注 rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(...) 进行了rqController中注册ResourceEventHandlerFuncs:addQuota和enqueueResourceQuota。另外, replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) 负责replenishmentController的创建,NewRegistry中注册了6种replenishmentSource,所以这里replenishmentControllers会添加对应的6中replenishmentController。

###ResourceQuotaController.Run

创建完ResourceQuotaController之后,就执行Run方法开始进行任务处理了。

pkg/controller/resourcequota/resource_quota_controller.go:227

// Run begins quota controller using the specified number of workers
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
	...
	// 启动rqController和rq.replenishmentControllers中的6中replenishmentController,开始watch对应的ResourceQuota加入到queue和missingUsageQueue。
	go rq.rqController.Run(stopCh)
	// the controllers that replenish other resources to respond rapidly to state changes
	for _, replenishmentController := range rq.replenishmentControllers {
		go replenishmentController.Run(stopCh)
	}
	
	// 启动workers数量的worker协程,分别对queue和missingUsageQueue中的Item。
	for i := 0; i < workers; i++ {
		go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
		go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
	}
	
	// 定期的进行全量的quotas计算。
	go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
	<-stopCh
	glog.Infof("Shutting down ResourceQuotaController")
	rq.queue.ShutDown()
}

Worker

接下来的主要处理都交给了workers进行处理了,默认配置是有5个worker对queue进行处理,有5个worker对missingUsageQuota进行处理。下面来看看worker是怎么对Queue中的Item进行处理的。

pkg/controller/resourcequota/resource_quota_controller.go:199

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
	workFunc := func() bool {
		// 从queue中获取Key
		key, quit := queue.Get()
		if quit {
			return true
		}
		defer queue.Done(key)
		
		// 执行NewResourceQuotaController时注册的syncHandler(流程跳转到syncResourceQuotaFromKey) 
		err := rq.syncHandler(key.(string))
		...
	}

	return func() {
		for {
			if quit := workFunc(); quit {
				glog.Infof("resource quota controller worker shutting down")
				return
			}
		}
	}
}

流程进入到syncResourceQuotaFromKey,下面看看它的实现:

pkg/controller/resourcequota/resource_quota_controller.go:247

// syncResourceQuotaFromKey syncs a quota key
func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
	...
	obj, exists, err := rq.rqIndexer.GetByKey(key)
	...
	quota := *obj.(*v1.ResourceQuota)
	return rq.syncResourceQuota(quota)
}

syncResourceQuotaFromKey根据从queue中获得的key,从rqIndexer中得到该Object,然后执行rq.syncResourceQuota(quota)。

pkg/controller/resourcequota/resource_quota_controller.go:268

// syncResourceQuota runs a complete sync of resource quota status across all known kinds
func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) {
	...
	newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
	...

	// ensure set of used values match those that have hard constraints
	hardResources := quota.ResourceNames(hardLimits)
	used = quota.Mask(used, hardResources)
	usage := api.ResourceQuota{
		ObjectMeta: api.ObjectMeta{
			Name:            resourceQuota.Name,
			Namespace:       resourceQuota.Namespace,
			ResourceVersion: resourceQuota.ResourceVersion,
			Labels:          resourceQuota.Labels,
			Annotations:     resourceQuota.Annotations},
		Status: api.ResourceQuotaStatus{
			Hard: hardLimits,
			Used: used,
		},
	}

	dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)

	// there was a change observed by this controller that requires we update quota
	if dirty {
		v1Usage := &v1.ResourceQuota{}
		if err := v1.Convert_api_ResourceQuota_To_v1_ResourceQuota(&usage, v1Usage, nil); err != nil {
			return err
		}
		_, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(v1Usage)
		return err
	}
	return nil
}

syncResourceQuota中最关键的操作是: newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry) quota.CalculateUsage根据namespace, quota的Scope,hardLimits,registry对该Item(resourceQuota)进行CalculateUsage。

pkg/quota/resources.go:217

// CalculateUsage calculates and returns the requested ResourceList usage
func CalculateUsage(namespaceName string, scopes []api.ResourceQuotaScope, hardLimits api.ResourceList, registry Registry) (api.ResourceList, error) {
	// find the intersection between the hard resources on the quota
	// and the resources this controller can track to know what we can
	// look to measure updated usage stats for
	hardResources := ResourceNames(hardLimits)
	potentialResources := []api.ResourceName{}
	evaluators := registry.Evaluators()
	for _, evaluator := range evaluators {
		potentialResources = append(potentialResources, evaluator.MatchingResources(hardResources)...)
	}
	// NOTE: the intersection just removes duplicates since the evaluator match intersects wtih hard
	matchedResources := Intersection(hardResources, potentialResources)

	// sum the observed usage from each evaluator
	newUsage := api.ResourceList{}
	for _, evaluator := range evaluators {
		// only trigger the evaluator if it matches a resource in the quota, otherwise, skip calculating anything
		intersection := evaluator.MatchingResources(matchedResources)
		if len(intersection) == 0 {
			continue
		}

		usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection}
		stats, err := evaluator.UsageStats(usageStatsOptions)
		if err != nil {
			return nil, err
		}
		newUsage = Add(newUsage, stats.Used)
	}

	// mask the observed usage to only the set of resources tracked by this quota
	// merge our observed usage with the quota usage status
	// if the new usage is different than the last usage, we will need to do an update
	newUsage = Mask(newUsage, matchedResources)
	return newUsage, nil
}

CalculateUsage中最重要的一步是循环registry中注册的所有Evaluators,执行对应Evaluator的UsageStats方法进资源使用统计。看到这里,你也许懵逼了,Evaluators又是个什么东西?

我们先来看看Registry和Evaluator的关系,以及Evaluator的定义。

pkg/quota/interfaces.go:62

// Registry holds the list of evaluators associated to a particular group kind
type Registry interface {
	// Evaluators returns the set Evaluator objects registered to a groupKind
	Evaluators() map[schema.GroupKind]Evaluator
}


pkg/quota/interfaces.go:43

// Evaluator knows how to evaluate quota usage for a particular group kind
type Evaluator interface {
	// Constraints ensures that each required resource is present on item
	Constraints(required []api.ResourceName, item runtime.Object) error
	// GroupKind returns the groupKind that this object knows how to evaluate
	GroupKind() schema.GroupKind
	// Handles determines if quota could be impacted by the specified operation.
	// If true, admission control must perform quota processing for the operation, otherwise it is safe to ignore quota.
	Handles(operation admission.Operation) bool
	// Matches returns true if the specified quota matches the input item
	Matches(resourceQuota *api.ResourceQuota, item runtime.Object) (bool, error)
	// MatchingResources takes the input specified list of resources and returns the set of resources evaluator matches.
	MatchingResources(input []api.ResourceName) []api.ResourceName
	// Usage returns the resource usage for the specified object
	Usage(item runtime.Object) (api.ResourceList, error)
	// UsageStats calculates latest observed usage stats for all objects
	UsageStats(options UsageStatsOptions) (UsageStats, error)
}

可见Evaluator就是一系列操作的集合,是一个Interface,而Registry就是资源类型到Evaluator的一个Map。

Kubernetes中定义了7种资源的Evaluator,都在pkg/quota/evaluator/core/*目录下,比如pods.go就是PodEvaluator的实现,里面实现了关键的UsageStats方法。除了PodEvaluator之外,其他的Evaluator的UsageStats实现,都是genericEvaluator来完成的,其代码在pkg/quota/generic/evaluator.go:177

具体Evaluator的代码分析,请读者自行完成。

下面我给出Worker的内部流程图,供大家参考:

###ReplenishmentController

rqController负责watch待sync的ResourceQuota,并将其加入到queue和missingUsageQueue中,而上面分析NewResourceQuotaController代码时提到: replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) 负责replenishmentController的创建,那replenishmentController又是啥呢?我们有必要来看看replenishmentController的创建。

pkg/controller/resourcequota/replenishment_controller.go:131

func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) {
	...
	switch options.GroupKind {
	case api.Kind("Pod"):
		if r.sharedInformerFactory != nil {
			result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
				UpdateFunc: PodReplenishmentUpdateFunc(options),
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			})
			break
		}
		result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
	case api.Kind("Service"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().Services(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().Services(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.Service{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				UpdateFunc: ServiceReplenishmentUpdateFunc(options),
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("ReplicationController"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.ReplicationController{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("PersistentVolumeClaim"):
		if r.sharedInformerFactory != nil {
			result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			})
			break
		}
		// TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths...
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.PersistentVolumeClaim{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("Secret"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().Secrets(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().Secrets(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.Secret{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("ConfigMap"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.ConfigMap{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	default:
		return nil, NewUnhandledGroupKindError(options.GroupKind)
	}
	return result, err
}

整个代码结构非常清晰,就是根据不同的资源类型,返回对应的Controller。而每种资源的Controller的定义都是通过创建一个对应的Informer完成。Informer中注册对应的ResourceEventHandlerFuncs:UpdateFunc和DeleteFunc用来出watch的对象发生对应的change时需要调用的方法。

以Pod为例,看看Pod注册的UpdateFunc:PodReplenishmentUpdateFunc和DeleteFunc:ObjectReplenishmentDeleteFunc,你就知道replenishmentController是用来干啥的了。

pkg/controller/resourcequota/replenishment_controller.go:56

// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
	return func(oldObj, newObj interface{}) {
		oldPod := oldObj.(*v1.Pod)
		newPod := newObj.(*v1.Pod)
		if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) {
			options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
		}
	}
}

// ObjectReplenenishmentDeleteFunc will replenish on every delete
func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {
	return func(obj interface{}) {
		metaObject, err := meta.Accessor(obj)
		if err != nil {
			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
			if !ok {
				glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())
				utilruntime.HandleError(err)
				return
			}
			metaObject, err = meta.Accessor(tombstone.Obj)
			if err != nil {
				glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())
				utilruntime.HandleError(err)
				return
			}
		}
		options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)
	}
}

在NewResourceQuotaController中创建replenishmentController时,已经指定了对应的ReplenishmentFunc为rq.replenishQuota,PodReplenishmentUpdateFunc和ObjectReplenishmentDeleteFunc最终都是调用ReplenishmentFunc(rq.replenishQuota)来进行quota recalculated。

pkg/controller/resourcequota/resource_quota_controller.go:330

// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) {
	...
	for i := range resourceQuotas {
		resourceQuota := resourceQuotas[i].(*v1.ResourceQuota)
		internalResourceQuota := &api.ResourceQuota{}
		if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil {
			glog.Error(err)
			continue
		}
		resourceQuotaResources := quota.ResourceNames(internalResourceQuota.Status.Hard)
		if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
			// 将该resourceQuota加入到队列queue
			rq.enqueueResourceQuota(resourceQuota)
		}
	}
}

因此replenishmentController就是用来捕获对应资源的Update/Delete事件,将其对应的ResourceQuota加入到queue中,然后worker再对其进行重新计算Usage。

总结

  • Kubernetes Controller Manager在初始化Controllers时执行startResourceQuotaController启动创建ResourceQuotaController并执行其Run方法来启动ResourceQuotaController。

  • ResourceQuotaController中包括两个队列:

    • queue:用来存放待sync和recalculate的ResourceQuota

    • missingUsageQueue:用来存放那些丢失Usage信息的ResourceQuota

  • ResourceQuotaController中有两种Controller:

    • rqController:通过List/Watch对应的资源及变化,根据情况,将ResourceQuota加入到queue和missingUsageQueue。

    • replenishmentControllers:通过监控资源的Update/Delete操作,将ResourceQuota加入到queue。

  • ResourceQuotaController中存在一个Registry对象,用来存放各种资源的Evaluator,包括:

    • PodEvaluator

    • ConfigMapEvaluator

    • PersistentVolumeClaimEvaluator

    • ResourceQuotaEvaluator

    • ReplicationControllerEvaluator

    • ServiceEvaluator

    • SecretEvaluator

  • ResourceQuotaController中的replenishmentControllers包含以下replenishmentController:

    • PodReplenishController

    • ConfigMapReplenishController

    • PersistentVolumeClaimReplenishController

    • ReplicationControllerReplenishController

    • ServiceReplenishController

    • SecretReplenishController

  • ResourceQuotaController中默认存在5个worker对queue中的ResourceQuota Item进行处理。可通过kube-controller-manager的--concurrent-resource-quota-syncs配置。

  • ResourceQuotaController中默认存在5个worker对missingUsageQueue中的ResourceQuota Item进行处理。可通过kube-controller-manager的--concurrent-resource-quota-syncs配置。

  • ResourceQuotaController默认5min会做一次全量的quota usage同步。可通过kube-controller-manager的--resource-quota-sync-period

关于Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注天达云行业资讯频道了解更多相关知识。

返回云计算教程...