Loading...
墨滴

一介刁民

2021/05/15  阅读:63  主题:橙心

【源码解析】Knative Serving核心逻辑实现(上篇)

本文会主要介绍 Serving 中的核心组件:ConfigurationRouteRevisionService 的具体代码实现。AutoscalerActivator 会在下篇进行介绍。

概念介绍

Knative Serving 的工作流程图如上图所示,其中包括了四个主要资源:ServiceRouteConfigurationRevision

  1. Service: 自动管理工作负载整个生命周期。负责创建 route,configuration。通过 Service可以指定路由流量使用最新的revision,或者历史版本的 revision
  2. Route:负责映射网络端点到一个或多个revision。可以通过多种方式管理流量。包括灰度流量和重命名路由。
  3. Configuration:负责保持工作负载的期望状态,包含了代码与配置相关的信息,并遵循应用开发的12要素原则 https://12factor.net/zh_cn/。修改一次 Configuration 产生一个 revision
  4. RevisionRevision 资源是对工作负载进行的每个修改的代码和配置的快照。Revision 是不可变对象,可以长期保留。

上述主要是针对四种主要资源的介绍,关于 Knative Serving 所有资源的介绍,请看本公众号之前发的一篇微信文章 【超详细】深入探究 Knative 扩缩容的奥秘

复习完基本概念后,我们可以开始阅读源码了。

注意

Knative 项目下有一个 repositorypkg 。这个 repo 里面包含着 Knative common packages,其中K8s相关的 Custom Controller 的逻辑都在里面。

serving 里面也有 pkg 这个路径,为避免混淆,在写文件路径时第一个路径是具体的 repo 名字,所以/serving/pkg/pkg/是两个不同的repo中的不同的路径。

Knative 和其下repo 的结构比较分散。我们需要在意的代码主要在如下几个路径

/serving/cmd/controller: 函数入口,创建 serving 中相关资源的 controller。 结构如下:

    controler
    |
    | --- kodata
    | --- main.go

/serving/pkg/injection/reconciler/serving/v1/: 不同的 Knative 资源具体的 Reconciler 工作逻辑的实现。 结构如下:

    v1
    |
    | --- configuration
  | --- controller.go
  | --- reconciler.go
  | --- state.go
 | --- revision
  | --- controller.go
  | --- reconciler.go
  | --- state.go
 | --- route
  | --- controller.go
  | --- reconciler.go
  | --- state.go
 | --- service
  | --- controller.go
  | --- reconciler.go
  | --- state.go

/serving/pkg/reconciler/: 暴露出 controller constructor : func NewController(...) *controller.Impl { ... }`` 这些constructors`:

  1. 构建 Reconciler,
  2. 使用 Reconciler构建 controller.Impl,
  3. 配置当前 Reconciler 相关的 informers 来调用 controller中合适的 enqueue method。 结构如下:
 reconciler
 | 
 | --- configuration
  | --- configuration.go
  | --- controller.go
 | --- revision
  | --- controller.go
  | --- revision.go
 | --- route
  | --- controller.go
  | --- route.go
 | --- service
  | --- controller.go
  | --- service.go

如上所示,这些组件都有几乎一致的结构,所以我们下面描述的是通用的工作流程,为方便以及突出核心流程,这里用xxx表示组件的名字,拿Route举例:/xxx/xxx.go 表示 /route/route.go

源码分析

1. 启动Controller

cmd/controller/main.go 中,启动 Configuration、Revision、Route、Service的Controller

  var ctors = []injection.ControllerConstructor{
  configuration.NewController,
  labeler.NewController,
  revision.NewController,
  route.NewController,
  serverlessservice.NewController,
  service.NewController,
  gc.NewController,
 }

 func main() {
  sharedmain.Main("controller", ctors...)
 }

其中,sharedmain.Main()/pkg/injection/sharedmain 中,作用就是为 injected controller 提供一个通用的函数入口。

同时我们在这里还启动了 labeler、serverlesservice、gc。本文档主要分析 Configuration,Revision,Route和Service。我们会在后续文档中继续介绍其他的几种。

Configuration、Revision、RouteService 启动过程都是一样的,都是实现在 injection.ControllerConstructor{} 时调用 xxx.NewController 在创建相应的 controller

xxx.NewController的定义在/serving/pkg/reconciler/xxx/controller.go中,以 route为例:在/serving/pkg/reconciler/route/controller.go中,

    func NewController(
  ctx context.Context,
  cmw configmap.Watcher,
 )
 *controller.Impl
 {
  return newController(ctx, cmw, clock.RealClock{})
 }

    func newController(
  ctx context.Context,
  cmw configmap.Watcher,
  clock clock.Clock,
  opts ...reconcilerOption,
 )
 *controller.Impl
 {
  ...
  // 通过创建并连接相关的Lister来实现Reconciler
  c := &Reconciler{
   kubeclient:          kubeclient.Get(ctx),
   client:              servingclient.Get(ctx),
   netclient:           netclient.Get(ctx),
   configurationLister: configInformer.Lister(),
   revisionLister:      revisionInformer.Lister(),
   serviceLister:       serviceInformer.Lister(),
   ingressLister:       ingressInformer.Lister(),
   certificateLister:   certificateInformer.Lister(),
   clock:               clock,
  }
  // 在此实现route的Reconciler 
        // 命名格式都是xxxreconciler
  impl := routereconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
   configsToResync := []interface{}{
    &network.Config{},
    &config.Domain{},
   }
   resync := configmap.TypeFilter(configsToResync...)(func(stringinterface{}) {
    impl.GlobalResync(routeInformer.Informer())
   })
   configStore := config.NewStore(logging.WithLogger(ctx, logger.Named("config-store")), resync)
   configStore.WatchConfigs(cmw)
   return controller.Options{ConfigStore: configStore}
  })
  ...
  return impl
 }

其中Reconciler的定义在/serving/pkg/reconciler/xxx/xxx.go中:

 type Reconciler struct {
	kubeclient kubernetes.Interface
	client     clientset.Interface
	netclient  netclientset.Interface

	// Listers index properties about resources
	configurationLister listers.ConfigurationLister
	revisionLister      listers.RevisionLister
	serviceLister       corev1listers.ServiceLister
	ingressLister       networkinglisters.IngressLister
	certificateLister   networkinglisters.CertificateLister
	tracker             tracker.Interface

	clock        clock.PassiveClock
	enqueueAfter func(interface{}, time.Duration)
}

Reconciler 结构体实现了接口 Interface(这个接口的名字就叫Interface),定义在/serving/pkg/client/injection/reconciler/serving/v1/xxx/reconciler.go

    type Interface interface {
  // ReconcileKind implements custom logic to reconcile v1.Service. Any changes
  // to the objects .Status or .Finalizers will be propagated to the stored
  // object. It is recommended that implementors do not call any update calls
  // for the Kind inside of ReconcileKind, it is the responsibility of the calling
  // controller to propagate those properties. The resource passed to ReconcileKind
  // will always have an empty deletion timestamp.
  ReconcileKind(ctx context.Context, o *v1.Service) reconciler.Event
 }

这里的ReconcileKind()就是Route、Configuration、Service、Revision Controller各自的组件工作逻辑。他们在/serving/pkg/reconciler/xxx/xxx.go中被实现。我们在第3部分详细介绍。

回到func newController()中:

其中xxxreconciler.NewImpl() 的定义在/serving/pkg/client/injection/reconciler/serving/v1/xxx/controller.go

    // NewImpl returns a controller.Impl that handles queuing and feeding work from
 // the queue through an implementation of controller.Reconciler, delegating to
 // the provided Interface and optional Finalizer methods. OptionsFn is used to return
 // controller.Options to be used by the internal reconciler.
 func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsFn) *controller.Impl {
  // reconcilerImpl implements controller.Reconciler for v1.Service resources.
  rec := &reconcilerImpl{
   LeaderAwareFuncs: reconciler.LeaderAwareFuncs{
   PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)error {
     all, err := lister.List(labels.Everything())
     if err != nil {
      return err
     }
     for _, elt := range all {
      // TODO: Consider letting users specify a filter in options.
      enq(bkt, types.NamespacedName{
       Namespace: elt.GetNamespace(),
       Name:      elt.GetName(),
      })
     }
     return nil
    },
   },
   Client:        client.Get(ctx),
   Lister:        lister,
   reconciler:    r,
   finalizerName: defaultFinalizerName,
  }
  ...
  // NewImpl instantiates an instance of our controller that will feed work to the provided Reconciler as it is enqueued.
         impl := controller.NewImpl(rec, logger, ctrTypeName)
            ...
  return impl
 }

至此,Router、Configuration、RevisionService 等资源的 controller 就启动了。

2. 调用Controller工作逻辑

工作流程为大家喜闻乐见的 K8sCustom Controller 工作流程。所以略过 K8s 相关的直接快进到 serving 中各组件的 Controller 的工作逻辑启动: 在pkg/controller/controller.go

 func (c *Impl) processNextWorkItem() bool {
  ...
  if err = c.Reconciler.Reconcile(ctx, keyStr); err != nil {
   return true
  }
  ...
  return true
 }

这里c.Reconciler.Reconcile(ctx, keyStr) 就是上面讲的/serving/pkg/client/injection/reconciler/serving/v1/xxx/reconciler.go中各个组件自己相关的 reconcilerImplReconcile()。以 route 为例(流程一致,只有在获取资源时 Lister 调用的方法不一样):

    func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error {

  s, err := newState(key, r)
  ...
  // Get the resource with this namespace/name.
  getter := r.Lister.Routes(s.namespace)
  original, err := getter.Get(s.name)
  // Don't modify the informers copy.
  resource := original.DeepCopy()

  // 此处是revision、configuration、route 和 service controller 的不同的地方
            // 在这里我们调用route controller自己的工作逻辑
  name, do := s.reconcileMethodFor(resource)

  // 选择reconcile策略
  switch name {
   case reconciler.DoReconcileKind: ... 
   case reconciler.DoFinalizeKind: ...
   case reconciler.DoObserveKind, reconciler.DoObserveFinalizeKind: ...
  }
 
  // 同步status
  switch {
  case r.skipStatusUpdates:
  // This reconciler implementation is configured to skip resource updates.
  // This may mean this reconciler does not observe spec, but reconciles external changes.
  case equality.Semantic.DeepEqual(original.Status, resource.Status):
  // If we didn't change anything then don't call updateStatus.
  // This is important because the copy we loaded from the injectionInformer's
  // cache may be stale and we don't want to overwrite a prior update
  // to status with this stale state.
  case !s.isLeader:
  // High-availability reconcilers may have many replicas watching the resource, but only
  // the elected leader is expected to write modifications.
  logger.Warn("Saw status changes when we aren't the leader!")
  default:
   if err = r.updateStatus(ctx, original, resource); err != nil {
    logger.Warnw("Failed to update resource status", zap.Error(err))
    r.Recorder.Eventf(resource, corev1.EventTypeWarning, "UpdateFailed",
    "Failed to update status for %q: %v", resource.Name, err)
    return err
   }
  }
  return nil
 }

reconcileMethodFor(resource)中,serving中的组件Controller具体的工作逻辑被调用。它的定义在/serving/pkg/client/injection/reconciler/serving/v1/xxx/state.go

 func newState(key string, r *reconcilerImpl) (*state, error) {
  // Convert the namespace/name string into a distinct namespace and name.
  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
   return nil, fmt.Errorf("invalid resource key: %s", key)
  }

  roi, isROI := r.reconciler.(ReadOnlyInterface)
  rof, isROF := r.reconciler.(ReadOnlyFinalizer)

  isLeader := r.IsLeaderFor(types.NamespacedName{
   Namespace: namespace,
   Name:      name,
  })

  return &state{
   key:        key,
   namespace:  namespace,
   name:       name,
   reconciler: r.reconciler,
   roi:        roi,
   isROI:      isROI,
   rof:        rof,
   isROF:      isROF,
   isLeader:   isLeader,
  }, nil
 }

 func (s *state) reconcileMethodFor(o *v1.Route) (string, doReconcile) {
  if o.GetDeletionTimestamp().IsZero() {
   if s.isLeader {
    return reconciler.DoReconcileKind, s.reconciler.ReconcileKind
   }
  }
        ...
 }

然后在/serving/pkg/reconciler/xxx/xxx.go中,我们才终于正式开始具体的工作逻辑。。。

 // ReconcileKind implements Interface.ReconcileKind.
 func (c *Reconciler) ReconcileKind(ctx context.Context, r *v1.Route) pkgreconciler.Event {}

3.组件具体的工作逻辑

Service

具体实现在/serving/pkg/reconciler/service/service.go

Service: 根据 Service 创建 ConfigurationRoute 资源。

 func (c *Reconciler) ReconcileKind(ctx context.Context, service *v1.Service) pkgreconciler.Event {

  // 创建configuration
  config, err := c.config(ctx, service)

  // 检查新的configuration是否reconcile到我们需要的配置并更新状态
  if config.Generation != config.Status.ObservedGeneration {
   // The Configuration hasn't yet reconciled our latest changes to
   // its desired state, so its conditions are outdated.
   service.Status.MarkConfigurationNotReconciled()
   // If BYO-Revision name is used we must serialize reconciling the Configuration
   // and Route. Wait for observed generation to match before continuing.
   if config.Spec.GetTemplate().Name != "" {
    return nil
   }
  } else {
   logger.Debugf("Configuration Conditions = %#v", config.Status.Conditions)
   // Update our Status based on the state of our underlying Configuration.
   service.Status.PropagateConfigurationStatus(&config.Status)
  }

  ...

  // 创建route
  route, err := c.route(ctx, service)

  ss := &service.Status
  // 检查新的route是否reconcile到我们需要的配置并更新状态
  if route.Generation != route.Status.ObservedGeneration {
   // The Route hasn't yet reconciled our latest changes to
   // its desired state, so its conditions are outdated.
   ss.MarkRouteNotReconciled()
  } else {
   // Update our Status based on the state of our underlying Route.
   ss.PropagateRouteStatus(&route.Status)
  }
  return nil
 }

Route

具体实现在/serving/pkg/reconciler/route/route.go

  1. 判断是否有 ReadyRevision 可进行 traffic
  2. 创建 k8s service:这个 Service 主要为 Istio 路由提供域名访问。
  3. 根据 traffic 规则创建/更新 kingress 。这个 kingress 的作用就是从 revision l历史版本中 选择合适的版本
  4. 根据 kingress 更新 k8s service

 func (c *Reconciler) ReconcileKind(ctx context.Context, r *v1.Route) pkgreconciler.Event {
  // Configure traffic based on the RouteSpec.
  traffic, err := c.configureTraffic(ctx, r)
  if traffic == nil || err != nil {
   // Traffic targets aren't ready, no need to configure child resources.
   return err
  }

  // Update the information that makes us Addressable.
  r.Status.Address = &duckv1alpha1.Addressable{
         Hostname: resourcenames.K8sServiceFullname(r),
      }
 
  logger.Info("Creating placeholder k8s services")
  services, err := c.reconcilePlaceholderServices(ctx, r, traffic.Targets)
  if err != nil {
   return err
  }

  // config tls
  tls, acmeChallenges, err := c.tls(ctx, r.Status.URL.Host, r, traffic)
  if err != nil {
   return err
  }

  // Reconcile ingress and its children resources.
  ingress, effectiveRO, err := c.reconcileIngress(ctx, r, traffic, tls, ingressClassForRoute(ctx, r), acmeChallenges...)
  if err != nil {
   return err
  }
 
  // 查看rollout状态 
  roInProgress := !effectiveRO.Done()
  if ingress.GetObjectMeta().GetGeneration() != ingress.Status.ObservedGeneration {
   r.Status.MarkIngressNotConfigured()
  } else if !roInProgress {
   r.Status.PropagateIngressStatus(ingress.Status)
  }

  logger.Info("Updating placeholder k8s services with ingress information")
  if err := c.updatePlaceholderServices(ctx, r, services, ingress); err != nil {
   return err
  }

  if roInProgress {
   logger.Info("Rollout is in progress")
   // Rollout in progress, so mark the status as such.
   r.Status.MarkIngressRolloutInProgress()
   // Update the route.Status.Traffic to contain correct traffic
   // distribution based on rollout status.
   r.Status.Traffic, err = traffic.GetRevisionTrafficTargets(ctx, r, effectiveRO)
   if err != nil {
    return err
   }
   return nil
  }

  logger.Info("Route successfully synced")
  return nil
 }

Configuration

具体实现在/serving/pkg/reconciler/configuration/configuration.go

  1. 获取当前 Configuration 对应的 Revision , 若不存在则创建。
  2. Configuration 设置最新的 Revision
  3. 根据 Revision 是否 readiness,设置 Configuration 的状态 LatestReadyRevisionName
  4. 找到最后准备好的 revision 并设置 LatestReadyRevisionName

 func (c *Reconciler) ReconcileKind(ctx context.Context, config *v1.Configuration) pkgreconciler.Event {
  // First, fetch the revision that should exist for the current generation.
  lcr, err := c.latestCreatedRevision(ctx, config)
  if errors.IsNotFound(err) {
   lcr, err = c.createRevision(ctx, config)
   if errors.IsAlreadyExists(err) {
    // Newer revisions with a consistent naming scheme can theoretically hit this
    // path during normal operation so we don't actually report any failures to
    // the user.
    // We fail reconciliation anyway to make sure we get the correct revision for
    // further processing.
    return fmt.Errorf("failed to create Revision: %w", err)
   } else if err != nil {
    recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed""Failed to create Revision: %v", err)
    config.Status.MarkRevisionCreationFailed(err.Error())

    return fmt.Errorf("failed to create Revision: %w", err)
   }
  } else if errors.IsAlreadyExists(err) {
   // If we get an already-exists error from latestCreatedRevision it means
   // that the Revision name already exists for another Configuration or at
   // the wrong generation of this configuration.
   config.Status.MarkRevisionCreationFailed(err.Error())
   return nil
  } else if err != nil {
   return fmt.Errorf("failed to get Revision: %w", err)
  }

  // Second, set this to be the latest revision that we have created.
  config.Status.SetLatestCreatedRevisionName(revName)

  // Last, determine whether we should set LatestReadyRevisionName to our
  // LatestCreatedRevision based on its readiness.
  rc := lcr.Status.GetCondition(v1.RevisionConditionReady)
  switch {
  case rc.IsUnknown():
   logger.Infof("Revision %q of configuration is not ready", revName)

  case rc.IsTrue():
   logger.Infof("Revision %q of configuration is ready", revName)
   if config.Status.LatestReadyRevisionName == "" {
    // Surface an event for the first revision becoming ready.
    recorder.Event(config, corev1.EventTypeNormal, "ConfigurationReady",
     "Configuration becomes ready")
   }

  case rc.IsFalse():
   logger.Infof("Revision %q of configuration has failed: Reason=%s Message=%q", revName, rc.Reason, rc.Message)
   beforeReady := config.Status.GetCondition(v1.ConfigurationConditionReady)
   config.Status.MarkLatestCreatedFailed(lcr.Name, rc.GetMessage())

   if !equality.Semantic.DeepEqual(beforeReady, config.Status.GetCondition(v1.ConfigurationConditionReady)) {
    recorder.Eventf(config, corev1.EventTypeWarning, "LatestCreatedFailed",
     "Latest created revision %q has failed", lcr.Name)
   }

  default:
   return fmt.Errorf("unrecognized condition status: %v on revision %q", rc.Status, revName)
  }

  if err = c.findAndSetLatestReadyRevision(ctx, config); err != nil {
   return fmt.Errorf("failed to find and set latest ready revision: %w", err)
  }
  return nil
 }

Revision

具体实现在/serving/pkg/reconciler/revision/revision.go

  1. 检查 Revision 状态
  2. 设置镜像摘要
  3. 创建 deployment
  4. 创建 image 缓存
  5. 创建 PA
 
 func (c *Reconciler) ReconcileKind(ctx context.Context, rev *v1.Revision) pkgreconciler.Event {
  // check if the Status condition RevisionConditionReady is true and the latest spec has been observed.
  readyBeforeReconcile := rev.IsReady()
  
  // 获取镜像摘要
  reconciled, err := c.reconcileDigest(ctx, rev)
  if err != nil {
   return err
  }
 if !reconciled {
  // Digest not resolved yet, wait for background resolution to re-enqueue the revision.
  rev.Status.MarkResourcesAvailableUnknown(v1.ReasonResolvingDigests, "")
  return nil
 }

  // 创建Deployment, ImageCache, 和 PA
  for _, phase := range []func(context.Context, *v1.Revision) error{
   c.reconcileDeployment,
   c.reconcileImageCache,
   c.reconcilePA,
  } {
   if err := phase(ctx, rev); err != nil {
    return err
   }
  }
  // check if the revision is starting to materialize runtime resources, and becomes true when those resources are ready
  readyAfterReconcile := rev.Status.GetCondition(v1.RevisionConditionReady).IsTrue()
  if !readyBeforeReconcile && readyAfterReconcile {
   logger.Info("Revision became ready")
   controller.GetEventRecorder(ctx).Event(
    rev, corev1.EventTypeNormal, "RevisionReady",
   "Revision becomes ready upon all resources being ready")
  } else if readyBeforeReconcile && !readyAfterReconcile {
   logger.Info("Revision stopped being ready")
  }

  return nil
 }

最终的效果如图所示

AutoscalerActivator 相关内容会在 Knative Serving 核心逻辑实现(下篇)中更新。欢迎大家在评论里提出意见建议和问题并讨论。叫我一沫Github id: cheimu

关注公众号: Knative,了解更多 Serverless 、Knative,云原生相关资讯

在这里插入图片描述
在这里插入图片描述

参考

  1. 《从HelloWorld看Knative Serving代码实现》,https://developer.aliyun.com/article/701985
  2. 《Getting Started with Knative》,https://www.oreilly.com/library/view/getting-started-with/9781492047025/
  3. 《Knative Cookbook Building Effective Serverless Applications with Kubernetes and OpenShift》,https://developers.redhat.com/books/knative-cookbook

一介刁民

2021/05/15  阅读:63  主题:橙心

作者介绍

一介刁民