Skywalking Swck Agent注入实现分析
项目地址:
GitHub – apache/skywalking-swck: Apache SkyWalking Cloud on Kubernetes
项目简介:
A bridge project between Apache SkyWalking and Kubernetes.
SWCK is a platform for the SkyWalking user that provisions, upgrades, maintains SkyWalking relevant components, and makes them work natively on Kubernetes.
skywalking-swck是一个在skywalking和kubernetes之间架起一座桥梁性质的项目。可以给用户提供skywalking相关组件及后期升级、维护。让他们使用起来更加云原生。
项目特性
- Java Agent Injector: Inject the java agent into the application pod natively.
- Inject the java agent into the application pod.
- Leverage a global configuration to simplify the agent and injector setup.
- Use the annotation to customize specific workloads.
- Synchronize injecting status to JavaAgent CR for monitoring purposes.
- Operator: Provision and maintain SkyWalking backend components.
- Custom Metrics Adapter: Provides custom metrics coming from SkyWalking OAP cluster for autoscaling by Kubernetes HPA
- 注入Java Agent: 以更加云原生的试注入java agent
- 将java agent注入到应用pod
- 可以全局化配置,简化agent注入操作
- 使用annotation自定义一些配置
- 将注入的状态同步到JavaAgent这个CR对象,便于监控
- Operator: 提供和维护SkyWalking后端的组件
- 自定义指标适配: 能够提供来自于SkyWalking OAP的自定义指标给kubernetes HPA,以便自动扩缩容
使用
skywalking-swck/java-agent-injector.md at master · apache/skywalking-swck
- 下载并安装Operator
- 创建ConfigMap/SwAgent等全局配置
Annotations > SwAgent > Configmap (Deprecated) > Default Configmap (Deprecated) - 接入应用配置label/annotations
skywalking agent inject on kubernetes
原理
概述
当kubectl apply 一个 deployment资源后,k8s会创建pod,此时k8s根据mutatingwebhookconfigurations资源配置(配置了监控的资源以及webhook server信息),调用相应的webhook server,webhook server会进行处理,在pod yaml中注入initContainer配置,使业务容器与initContainer容器共享skywalking agent目录,并且配置JAVA_TOOL_OPTIONS环境变量值为”-javaagent:/sky/agent/skywalking-agent.jar=agent.service_name=xxxx”,这样JVM启动时,会附加上javaagent,以达到目的。
详述
- 首先我们来看一下MutatingWebhookConfiguration和 ValidatingWebhookConfiguration资源
查看kubectl explain 对这两个资源的描述
MutatingWebhookConfiguration describes the configuration of and admission
webhook that accept or reject and may change the object.
ValidatingWebhookConfiguration describes the configuration of and admission
webhook that accept or reject and object without changing it.
简而言之,这两种资源都是准入控制器(Admission Controller)的两种实现,都能控制是否接受还是拒绝对资源对象的变化,但不同的是,MutatingWebhookConfiguration可以改变资源对象,而ValidatingWebhookConfiguration不可以,可以参看搞懂 Kubernetes 准入控制(Admission Controller)详细了解。
2. swck就是利用MutatingWebhookConfiguration实现了对pod的修改,我们来看下swck中的定义
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
annotations:
cert-manager.io/inject-ca-from: skywalking-swck-system/skywalking-swck-serving-cert
name: skywalking-swck-mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: skywalking-swck-webhook-service
namespace: skywalking-swck-system
path: /mutate-v1-pod
failurePolicy: Fail
name: mpod.kb.io
namespaceSelector:
matchLabels:
swck-injection: enabled
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- pods
sideEffects: None
从这段定义中可以看出,当带有标签swck-injection: enabled的Namespace下的POD资源有CREATE或者UPDATE操作时,将会调用path: /mutate-v1-pod。
3. 在swck项目中的operator/man.go中找到此URL
// register a webhook to enable the java agent injector
setupLog.Info("registering /mutate-v1-pod webhook")
mgr.GetWebhookServer().Register("/mutate-v1-pod",
&webhook.Admission{
Handler: &injector.JavaagentInjector{Client: mgr.GetClient()}})
setupLog.Info("/mutate-v1-pod webhook is registered")
swck向k8s注册了/mutate-v1-pod以及对应的handler。我们可以想到,当create或update pod时,k8s将会调用path对应的handler处理。
4. 查看Handler: injector.JavaagentInjector
// Handle will process every coming pod under the
// specified namespace which labeled "swck-injection=enabled"
func (r *JavaagentInjector) Handle(ctx context.Context, req admission.Request) admission.Response {
pod := &corev1.Pod{}
if err := r.decoder.Decode(req, pod); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
// set Annotations to avoid repeated judgments
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
// 查找所有匹配的swAgent
swAgentL := r.findMatchedSwAgentL(ctx, req, pod)
//初始化所有annotations(加载所有annotations)
anno, err := NewAnnotations()
if err != nil {
javaagentInjectorLog.Error(err, "get NewAnnotations error")
}
//创建AnnotationOverlay对象,它是一个map,用于保存被overlaied的annotation
ao := NewAnnotationOverlay()
//创建SidecarInjectField对象
s := NewSidecarInjectField()
//构建inject链对象
ip := NewInjectProcess(ctx, s, anno, ao, swAgentL, pod, req, javaagentInjectorLog, r.Client)
//开始inject
return ip.Run()
}
- 创建inject chain对象
Inject chain对象集合了此次有变更的Pod, webhook request, k8s client以及注解、swagent等对象
// NewInjectProcess create a new InjectProcess
func NewInjectProcess(ctx context.Context, injectFileds *SidecarInjectField, annotation *Annotations,
annotationOverlay *AnnotationOverlay, swAgentL *v1alpha1.SwAgentList, pod *corev1.Pod, req admission.Request, log logr.Logger,
kubeclient client.Client) *InjectProcessData {
return &InjectProcessData{
ctx: ctx,
injectFileds: injectFileds,
annotation: annotation,
annotationOverlay: annotationOverlay,
swAgentL: swAgentL,
pod: pod,
req: req,
log: log,
kubeclient: kubeclient,
}
}
- 看下ip.Run()方法
经过了前面铺垫,终于到了主题了,run方法首先按照执行顺序倒序构造了一个执行链,然后执行。
// Run will connect the above six steps into a chain and start to execute the first step
func (ipd *InjectProcessData) Run() admission.Response {
// set final step
podInject := &PodInject{}
// set next step is PodInject
getConfigmap := &GetConfigmap{}
getConfigmap.setNext(podInject)
// set next step is GetConfigmap
overlayPlugins := &OverlayPlugins{}
overlayPlugins.setNext(getConfigmap)
// set next step is OverlayPlugins
overlayAgent := &OverlayAgent{}
overlayAgent.setNext(overlayPlugins)
// set next step is OverlayAgent
overlaysidecar := &OverlaySidecar{}
overlaysidecar.setNext(overlayAgent)
// set next step is OverlaySwAgentCR
overlaySwAgentCR := &OverlaySwAgentCR{}
overlaySwAgentCR.setNext(overlaysidecar)
// set next step is OverlaySidecar
getStrategy := &GetStrategy{}
getStrategy.setNext(overlaySwAgentCR)
// this is first step and do real injection
return getStrategy.execute(ipd)
}
- 首先执行的是GetStrategy
func (gs *GetStrategy) execute(ipd *InjectProcessData) admission.Response {
log.Info("=============== GetStrategy ================")
ipd.injectFileds.GetInjectStrategy(*ipd.annotation, &ipd.pod.ObjectMeta.Labels, &ipd.pod.ObjectMeta.Annotations)
if !ipd.injectFileds.NeedInject {
log.Info("don"t inject agent")
return admission.Allowed("ok")
}
return gs.next.execute(ipd)
}
// GetInjectStrategy gets user"s injection strategy
func (s *SidecarInjectField) GetInjectStrategy(a Annotations, labels,
annotation *map[string]string) {
// set default value
s.NeedInject = false
// set NeedInject"s value , if the pod has the label "swck-java-agent-injected=true", means need inject
if *labels == nil {
return
}
if strings.EqualFold((*labels)[ActiveInjectorLabel], "true") {
s.NeedInject = true
}
if *annotation == nil {
return
}
// set injectContainer"s value
if v, ok := (*annotation)[sidecarInjectContainerAnno]; ok {
s.InjectContainer = v
}
}
逻辑比较简单,判断当前pod需不需要inject,以及获取inject的container名字的正则表达式。
8. 第二个执行的是OverlaySwAgentCR
// get configs from SwAgent CR
func (gs *OverlaySwAgentCR) execute(ipd *InjectProcessData) admission.Response {
log.Info(fmt.Sprintf("=============== OverlaySwAgentCR(%d) ================ ", len(ipd.swAgentL.Items)))
if !ipd.injectFileds.OverlaySwAgentCR(ipd.swAgentL, ipd.pod) {
log.Info("overlay SwAgent cr config error.")
return PatchReq(ipd.pod, ipd.req)
}
return gs.next.execute(ipd)
}
func (s *SidecarInjectField) OverlaySwAgentCR(swAgentL *v1alpha1.SwAgentList, pod *corev1.Pod) bool {
s.ConfigmapVolume.ConfigMap = new(corev1.ConfigMapVolumeSource)
// 如果找到多个匹配的SwAgent,则应用最后一个,其它几个忽略
if len(swAgentL.Items) > 0 {
swAgent := swAgentL.Items[len(swAgentL.Items)-1]
log.Info(fmt.Sprintf("agent %s loaded.", swAgent.Name))
// 首先配置了shared volume, mount path。默认sharedVolumeName是sky-agent,mount path是/sky/agent
// volume name可以更改, mountPath无法更改, mount path是业务容器上的path
s.SidecarVolume.Name = swAgent.Spec.SharedVolumeName
s.SidecarVolume.VolumeSource.EmptyDir = &corev1.EmptyDirVolumeSource{}
s.SidecarVolumeMount.Name = swAgent.Spec.SharedVolumeName
s.SidecarVolumeMount.MountPath = mountPath
// 如果swagent配置了configmap,则设置业务容器mount path,实际是由configmap中的agent配置
// 覆盖agent镜像中的配置
if swAgent.Spec.SwConfigMapVolume != nil {
if len(swAgent.Spec.SwConfigMapVolume.Name) > 0 &&
len(swAgent.Spec.SwConfigMapVolume.ConfigMapName) > 0 &&
len(swAgent.Spec.SwConfigMapVolume.ConfigMapMountFile) > 0 {
//s.ConfigmapVolume = corev1.Volume{}
s.ConfigmapVolume.Name = swAgent.Spec.SwConfigMapVolume.Name
s.ConfigmapVolume.ConfigMap = new(corev1.ConfigMapVolumeSource)
s.ConfigmapVolume.ConfigMap.Name = swAgent.Spec.SwConfigMapVolume.ConfigMapName
//s.ConfigmapVolumeMount = corev1.VolumeMount{}
s.ConfigmapVolumeMount.Name = swAgent.Spec.SwConfigMapVolume.Name
s.ConfigmapVolumeMount.MountPath = "/sky/agent/config/" + swAgent.Spec.SwConfigMapVolume.ConfigMapMountFile
s.ConfigmapVolumeMount.SubPath = swAgent.Spec.SwConfigMapVolume.ConfigMapMountFile
}
}
// init container
s.Initcontainer.Name = swAgent.Spec.JavaSidecar.Name
s.Initcontainer.Image = swAgent.Spec.JavaSidecar.Image
s.Initcontainer.Args = swAgent.Spec.JavaSidecar.Args
s.Initcontainer.Command = swAgent.Spec.JavaSidecar.Command
s.Initcontainer.VolumeMounts = append(s.Initcontainer.VolumeMounts, corev1.VolumeMount{
Name: swAgent.Spec.SharedVolumeName,
MountPath: mountPath,
})
// 将swagent配置的环境变量设置为业务容器的环境变量
s.Envs = swAgent.Spec.JavaSidecar.Env
s.InjectContainer = swAgent.Spec.ContainerMatcher
}
return true
}
- 第三个执行的是OverlaySidecar
func (os *OverlaySidecar) execute(ipd *InjectProcessData) admission.Response {
log.Info("=============== OverlaySidecar ================")
if !ipd.injectFileds.OverlaySidecar(*ipd.annotation, ipd.annotationOverlay, &ipd.pod.ObjectMeta.Annotations) {
return PatchReq(ipd.pod, ipd.req)
}
return os.next.execute(ipd)
}
// OverlaySidecar overlays default config
func (s *SidecarInjectField) OverlaySidecar(a Annotations, ao *AnnotationOverlay, annotation *map[string]string) bool {
s.Initcontainer.Command = make([]string, 1)
s.Initcontainer.Args = make([]string, 2)
if nil == s.ConfigmapVolume.ConfigMap {
s.ConfigmapVolume.ConfigMap = new(corev1.ConfigMapVolumeSource)
}
limitsStr := ""
requestStr := ""
// 创建sidercar注解map对象,其初始值从上一步执行结果中获取.map中的key为sidecar注解去掉前缀后的名称
annoField := map[string]*string{
"initcontainer.Name": &s.Initcontainer.Name,
"initcontainer.Image": &s.Initcontainer.Image,
"initcontainer.Command": &s.Initcontainer.Command[0],
"initcontainer.args.Option": &s.Initcontainer.Args[0],
"initcontainer.args.Command": &s.Initcontainer.Args[1],
"initcontainer.resources.limits": &limitsStr,
"initcontainer.resources.requests": &requestStr,
"sidecarVolume.Name": &s.SidecarVolume.Name,
"sidecarVolumeMount.MountPath": &s.SidecarVolumeMount.MountPath,
"configmapVolume.ConfigMap.Name": &s.ConfigmapVolume.ConfigMap.Name,
"configmapVolume.Name": &s.ConfigmapVolume.Name,
"configmapVolumeMount.MountPath": &s.ConfigmapVolumeMount.MountPath,
"env.Name": &s.Env.Name,
"env.Value": &s.Env.Value,
}
// 从全量注解中获取sidercar前缀的注解,遍历,检查Pod有没有设置相应sidercar注解,如果设置了,则覆盖map中对应key原来的值
anno := GetAnnotationsByPrefix(a, sidecarAnnotationPrefix)
for _, v := range anno.Annotations {
fieldName := strings.TrimPrefix(v.Name, sidecarAnnotationPrefix)
if pointer, ok := annoField[fieldName]; ok {
if !s.setValue(pointer, ao, annotation, v) {
return false
}
}
}
s.SidecarVolumeMount.Name = s.SidecarVolume.Name
s.ConfigmapVolumeMount.Name = s.ConfigmapVolume.Name
s.Initcontainer.VolumeMounts = []corev1.VolumeMount{s.SidecarVolumeMount}
// 设置init container的资源限制
if limitsStr != "nil" {
limits := make(corev1.ResourceList)
err := json.Unmarshal([]byte(limitsStr), &limits)
if err != nil {
log.Error(err, "unmarshal limitsStr error")
return false
}
s.Initcontainer.Resources.Limits = limits
}
// 设置init container需要申请的资源
if requestStr != "nil" {
requests := make(corev1.ResourceList)
err := json.Unmarshal([]byte(requestStr), &requests)
if err != nil {
log.Error(err, "unmarshal requestStr error")
return false
}
s.Initcontainer.Resources.Requests = requests
}
// the sidecar volume"s type is determined
s.SidecarVolume.VolumeSource.EmptyDir = nil
return true
}
- 第四个执行的是OverlayAgent
// OverlayAgent overlays the agent by getting the pod"s annotations
// If the agent overlay option is not set, go directly to the next step
// If set the wrong value in the annotation , inject the error info and return
func (oa *OverlayAgent) execute(ipd *InjectProcessData) admission.Response {
log.Info("=============== OverlayAgent ================")
if !ipd.injectFileds.OverlayAgent(*ipd.annotation, ipd.annotationOverlay, &ipd.pod.ObjectMeta.Annotations) {
ipd.log.Info("overlay agent config error!please look the error annotation!")
return PatchReq(ipd.pod, ipd.req)
}
return oa.next.execute(ipd)
}
// OverlayAgent overlays agent
func (s *SidecarInjectField) OverlayAgent(a Annotations, ao *AnnotationOverlay, annotation *map[string]string) bool {
// jvmAgentConfigStr init
s.JvmAgentConfigStr = ""
//遍历pod的注解,如果注解的名称存在于全量注解中,则将Pod注解及值保存到AnnotationOverlay map对象中
anno := GetAnnotationsByPrefix(a, agentAnnotationPrefix)
for k, v := range *annotation {
if strings.HasPrefix(k, agentAnnotationPrefix) {
for _, an := range anno.Annotations {
if strings.EqualFold(k, an.Name) {
if !s.AgentOverlayandGetValue(ao, annotation, an) {
return false
}
}
}
// 将pod注解去掉agent前缀,追加到JvmAgentConfigStr字段中
configName := strings.TrimPrefix(k, agentAnnotationPrefix)
config := strings.Join([]string{configName, v}, "=")
// add to jvmAgentConfigStr
if s.JvmAgentConfigStr != "" {
s.JvmAgentConfigStr = strings.Join([]string{s.JvmAgentConfigStr, config}, ",")
} else {
s.JvmAgentConfigStr = config
}
}
}
return true
}
- 第五个执行的是OverlayPlugins,与OverlayAgent逻辑类似。
- 第六个执行的是GetConfigmap,其作用是检查如果pod配置了agent configmap,则检查configmap配置的值是否正确.
func (s *SidecarInjectField) ValidateConfigmap(ctx context.Context, kubeclient client.Client, namespace string,
annotation *map[string]string) bool {
if len(s.ConfigmapVolume.Name) == 0 || len(s.ConfigmapVolume.ConfigMap.Name) == 0 {
return true
}
configmap := &corev1.ConfigMap{}
configmapName := s.ConfigmapVolume.VolumeSource.ConfigMap.LocalObjectReference.Name
// check whether the configmap is existed
err := kubeclient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: configmapName}, configmap)
if err != nil && !errors.IsNotFound(err) {
log.Error(err, "Get Configmap failed", "configmapName", configmapName, "namespace", namespace)
return false
}
// if configmap exist , validate it
if !errors.IsNotFound(err) {
ok, errinfo := ValidateConfigmap(configmap)
if ok {
log.Info("the configmap validate true", "configmapName", configmapName)
return true
}
log.Error(errinfo, "the configmap validate false", "configmapName", configmapName)
}
return true
}
- 最后一步是PodInject,顾名思义,其作用是进行Pod注入
// PodInject will inject all fields to the pod
func (pi *PodInject) execute(ipd *InjectProcessData) admission.Response {
log.Info("=============== PodInject ================")
ipd.injectFileds.Inject(ipd.pod)
// Pod注入完成后,添加sidecar.skywalking.apache.org/succeed=true注解
ipd.injectFileds.injectSucceedAnnotation(&ipd.pod.Annotations)
log.Info("inject successfully!")
// 序列化Pod,返回给k8s
return PatchReq(ipd.pod, ipd.req)
}
// Inject will do real injection
func (s *SidecarInjectField) Inject(pod *corev1.Pod) {
log.Info(fmt.Sprintf("inject pod : %s", pod.GenerateName))
// 将之前执行得到的InitContainer与pod配置的InitContainer合并在一起,也就是说pod initcontainer可以有多个
if pod.Spec.InitContainers != nil {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, s.Initcontainer)
} else {
pod.Spec.InitContainers = []corev1.Container{s.Initcontainer}
}
// add volume to spec
if pod.Spec.Volumes == nil {
pod.Spec.Volumes = []corev1.Volume{}
}
pod.Spec.Volumes = append(pod.Spec.Volumes, s.SidecarVolume)
if len(s.ConfigmapVolume.Name) > 0 && len(s.ConfigmapVolume.ConfigMap.Name) > 0 {
pod.Spec.Volumes = append(pod.Spec.Volumes, s.ConfigmapVolume)
}
//选择要注入的目标容器
targetContainers := s.findInjectContainer(pod.Spec.Containers)
//循环目标容器进行注入
for i := range targetContainers {
log.Info(fmt.Sprintf("inject container : %s", targetContainers[i].Name))
if (*targetContainers[i]).VolumeMounts == nil {
(*targetContainers[i]).VolumeMounts = []corev1.VolumeMount{}
}
// 注入voLume与configmap
(*targetContainers[i]).VolumeMounts = append((*targetContainers[i]).VolumeMounts, s.SidecarVolumeMount)
if len(s.ConfigmapVolumeMount.Name) > 0 && len(s.ConfigmapVolumeMount.MountPath) > 0 {
(*targetContainers[i]).VolumeMounts = append((*targetContainers[i]).VolumeMounts, s.ConfigmapVolumeMount)
}
//java agent参数,其值为上面的JvmAgentConfigStr
if (*targetContainers[i]).Env != nil {
(*targetContainers[i]).Env = append((*targetContainers[i]).Env, s.Env)
} else {
(*targetContainers[i]).Env = []corev1.EnvVar{s.Env}
}
//注入环境变量,如果container本身存在,则忽略
var envsTBA []corev1.EnvVar
for j, envInject := range s.Envs {
isExists := false
for _, envExists := range targetContainers[i].Env {
if strings.EqualFold(envExists.Name, envInject.Name) {
isExists = true
break
}
}
if !isExists {
envsTBA = append(envsTBA, s.Envs[j])
}
}
if len(s.Envs) > 0 {
(*targetContainers[i]).Env = append((*targetContainers[i]).Env, envsTBA...)
}
}
}
除了Pod注入,SWCK项目还有其它Operator, 包括Storage,OAP,UI,Adapter等,有兴趣的话可自行探索。总体来说swck利用k8s的自定义资源以及自定义控制器,为skywalking部署到kubernetes提供了适配,使skywalking能够快速部署到kubernetes这个基座上。
注意事项
- SwAgent只能在业务空间起作用,不能在skywalking-swck-system生效
因为webhook触发调用handler后,在查找SwAgent时,只会查找与Pod在一个命名空间中的Swagent. 如果想将SwAgent放到skywalking-swck-system命令空间,需要修改operator
- 删除资源时JavaAgent状态中统计的注入的容器数量不变化
因为MutatingWebhookConfiguration只监听了Pod的Create与Update事件。
- 调试需要问题
- 本地启用webhook前提下无法启动operator
因为启动webhook时,需要在本地启动webhook server,与k8s集群通过https通信, 本地需要添加tls.crt以及tls.key文件。而这两个文件从k8s获取。具体方法是查看skywalking-swck-controller-manager使用到的secret
kubectl get secret skywalking-swck-controller-manager-cert -n skywalking-swck-system -o jsonpath="{.data.tls.crt}"| base64 --decode > tls.crt
kubectl get secret skywalking-swck-controller-manager-cert -n skywalking-swck-system -o jsonpath="{.data.tls.key}"| base64 --decode > tls.key