• 腾讯mini项目-【指标监控服务重构】2023-08-26


    今日已办

    Venus 的 Trace 无感化

    定义 handler 函数

    1. fiber.Handler 的主要处理逻辑
    2. 返回处理中出现的 error
    3. 返回处理中响应 json 的函数
    // handler
    // @Description:
    // @Author xzx 2023-08-26 18:00:03
    // @Param c
    // @Return error
    // @Return func() error : function for response json
    type handler func(c *fiber.Ctx) (error, func() error)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    定义TraceWrapper函数

    1. otel-trace 的逻辑嵌入 handler 中

    2. 启动 span

    3. 执行 handler,记录 span 的 attributes

    4. 根据返回的 err, jsonRespFunc 分情况讨论

    5. 条件处理逻辑
      err != nil && jsonRespFunc == nil存在错误,将错误记录到Span中,结束Span,执行c.Next()
      err != nil && jsonRespFunc != nil存在错误,将错误记录到Span中,结束Span,响应JSON
      err == nil && jsonRespFunc == nil结束Span,执行c.Next()
      err == nil && jsonRespFunc != nil结束Span,响应JSON
    6. 代码实现

    // TraceWrapper return fiber.Handler integrate report otel-trace
    // @Description integrate otel-trace logic in handler
    // @Author xzx 2023-08-26 18:00:03
    // @Param f
    // @Param opts
    // @Return fiber.Handler
    func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {
    	return func(c *fiber.Ctx) error {
    		_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)
    		// execute handler logic
    		err, jsonRespFunc := f(c)
    		// todo: setSpanAttributes
    		if err != nil {
    			// record span error
    			span.RecordError(err)
    			span.SetStatus(codes.Error, err.Error())
    			span.End()
    			if jsonRespFunc != nil {
    				// response error result
    				return jsonRespFunc()
    			}
    			// ignore error, continue handlers
    			return c.Next()
    		}
    		span.End()
    		if jsonRespFunc != nil {
    			// response success result
    			return jsonRespFunc()
    		}
    		// err == nil, jsonRespFunc == nil
    		return c.Next()
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    具体的 handler 逻辑

    // SplitAndValidate split multi-events and validate uploaded data
    // @Description
    // @Author xzx 2023-08-26 17:54:21
    // @Param c
    // @Return Err
    // @Return f
    func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {
    	log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    
    	otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
    		attribute.String("type", "all_upload"),
    	)))
    
    	RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))
    	if err != nil {
    		log.Logger().Error("Create Baggage Member Failed", zap.Error(err))
    	}
    	Baggage, err := baggage.New(RequestIdBaggage)
    	if err != nil {
    		log.Logger().Error("Create Baggage Failed", zap.Error(err))
    	}
    	c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))
    
    	c.Accepts(fiber.MIMEMultipartForm)
    	uploadedData, err := parseJSON(c)
    	if err != nil {
    		log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    		c.Status(fiber.StatusBadRequest)
    		return err, func() error {
    			return c.JSON(protocol.Response{
    				Code: protocol.AllFail,
    				Data: err.Error(),
    			})
    		}
    	}
    	if err = uploadedData.Meta.Validate(); err != nil {
    		log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    		otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
    			attribute.String("type", "wrong_meta"),
    		)))
    		c.Status(fiber.StatusBadRequest)
    		return err, func() error {
    			return c.JSON(protocol.Response{
    				Code: protocol.AllFail,
    				Data: err.Error(),
    			})
    		}
    	}
    
    	// must use pointer when using Locals of fiber if it's about to modify
    	events := make([]*schema.Event, 0, len(uploadedData.Data))
    	parts := make([]*protocol.Part, 0, len(uploadedData.Data))
    
    	for idx, data := range uploadedData.Data {
    		log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))
    		event := &schema.Event{}
    		part := &protocol.Part{}
    		if err = data.Validate(); err != nil {
    			Err = err
    			otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
    				attribute.String("type", "wrong_data"),
    			)))
    			part.Code = protocol.ValidationError
    			part.Data = err.Error()
    		}
    		part.ID = data.ID
    
    		event.Meta = uploadedData.Meta
    		event.Data = data
    
    		events = append(events, event)
    		parts = append(parts, part)
    	}
    
    	c.Locals("meta", uploadedData.Meta)
    	c.Locals("events", events)
    	c.Locals("parts", parts)
    
    	return Err, nil
    }
    
    // HandleEvent handle event
    // @Description
    // @Author xzx 2023-08-26 17:54:23
    // @Param c
    // @Return Err
    // @Return f
    func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {
    	log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    
    	events := c.Locals("events").([]*schema.Event)
    	parts := c.Locals("parts").([]*protocol.Part)
    
    	for idx, event := range events {
    		log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))
    		event.BackendID = uuid.NewString()
    		event.UploadTime = time.Now().UnixMilli()
    
    		part := parts[idx]
    		part.BackendID = event.BackendID
    		country, region, city, err := ip.ParseIP(event.IP)
    		if err != nil {
    			Err = err
    			log.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))
    			part.Code = protocol.IPParsingError
    			part.Data = err.Error()
    		}
    		log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))
    		event.Country = country
    		event.Region = region
    		event.City = city
    	}
    
    	return Err, nil
    }
    
    // WriteKafka write to kafka
    // @Description
    // @Author xzx 2023-08-26 17:54:26
    // @Param c
    // @Return Err
    // @Return f
    func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {
    	log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    
    	meta := c.Locals("meta").(schema.Meta)
    	events := c.Locals("events").([]*schema.Event)
    	parts := c.Locals("parts").([]*protocol.Part)
    
    	// mark if all parts are succeed, which is response for code
    	isAllSuccess := true
    	// topic is like to_analyzer__0.PERF_CRASH
    	topic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)
    
    	traceparent := c.Get(traceparentHeaderKey)
    	if len(traceparent) == 55 {
    		spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()
    		traceparent = traceparent[:36] + spanId + traceparent[52:]
    	}
    
    	messages := make([]kafka.Message, 0, len(events))
    	for idx, event := range events {
    		// skip if event was failed
    		if parts[idx].Code != 0 {
    			isAllSuccess = false
    			continue
    		}
    		bytes, err := sonic.Marshal(event)
    		if err != nil {
    			Err = err
    			log.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    			parts[idx].Code = protocol.SerializationError
    			parts[idx].Data = err.Error()
    		}
    		messages = append(messages, kafka.Message{
    			Topic: topic,
    			Value: bytes,
    			Headers: []kafka.Header{
    				{Key: traceparentHeaderKey, Value: []byte(traceparent)},
    			},
    		})
    	}
    
    	if len(messages) == 0 { // would not write to kafka since every part were failed for some reason
    		log.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    		c.Status(fiber.StatusBadRequest)
    		return errors.New("every data were failed to handle, check their code and data"), func() error {
    			return c.JSON(protocol.Response{
    				Code:  protocol.AllFail,
    				Data:  "every data were failed to handle, check their code and data",
    				Parts: parts,
    			})
    		}
    	}
    
    	log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
    	kafkaProducer := connector.GetEventKafka()
    	if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {
    		log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))
    		c.Status(fiber.StatusInternalServerError)
    		return err, func() error {
    			return c.JSON(protocol.Response{
    				Code:  protocol.AllFail,
    				Data:  fmt.Sprintf("failed to write to kafka: %s", err.Error()),
    				Parts: parts,
    			})
    		}
    	}
    	if isAllSuccess {
    		c.Status(fiber.StatusOK)
    		otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
    			attribute.String("type", "success_upload"))))
    		return nil, func() error {
    			return c.JSON(protocol.Response{
    				Code:  protocol.AllSuccess,
    				Parts: parts,
    			})
    		}
    	} else {
    		c.Status(fiber.StatusPartialContent)
    		return Err, func() error {
    			return c.JSON(protocol.Response{
    				Code:  protocol.PartialSuccess,
    				Data:  "some data were failed to handle, check their code and data",
    				Parts: parts,
    			})
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209

    同步会议

    进度

    1. 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
    2. otel上报的开关,已经 review 完合并了
    3. 部署了jaeger的整套方案
    4. 关于 watermill 和 baserunner 的 banchmark

    Review和测试方案

    1. 代码 review
      1. fiber.Handler 接入 otel 无感化,代码可扩展性
      2. 修复部分命名规范、注释规范和代码质量检查中指出的问题
      3. 移除 profile-compose 的 grafana 容器
      4. log 的初始化
      5. 移除 venus-compose 的无用配置
    2. 测试方案
      1. 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
        1. 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
      2. 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
        1. 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
        2. es 已购买集群,等待接入
        3. 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
        4. 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
      3. 对于 ck 集群的指标
        1. 【插入】ck 的 写入耗时,1/5分钟的写入成功率
        2. 【查询】ck 的 查询耗时,查询成功率
        3. 【资源利用率】ck 的 cpu、内存利用率
      4. 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等

    总结

    1. 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
    2. 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
    3. 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
    4. 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
    5. 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
    6. 扩展:benchmark 的 hyperscan 和官方正则处理的对比

    数据记录、控制变量

    cadvisor

    image-20230826231631007

    services:
      cadvisor:
        image: gcr.io/cadvisor/cadvisor:latest
        container_name: cadvisor
        networks:
          - backend
        command: --url_base_prefix=/cadvisor
        volumes:
          - /:/rootfs:ro
          - /var/run:/var/run:rw
          - /sys:/sys:ro
          - /var/lib/docker/:/var/lib/docker:ro
          - /dev/disk/:/dev/disk:ro
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    jaeger-all-in-one 分开部署

    为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署

    services:
      jaeger-collector:
        image: jaegertracing/jaeger-collector
        container_name: jaeger-collector
        networks:
          - backend
        command: [
          "--es.server-urls=http://elasticsearch:9200",
          "--es.num-shards=1",
          "--es.num-replicas=0",
          "--log-level=info"
        ]
        environment:
          - SPAN_STORAGE_TYPE=elasticsearch
          - METRICS_STORAGE_TYPE=prometheus
          - PROMETHEUS_SERVER_URL=http://prometheus:9090
        depends_on:
          - elasticsearch
      jaeger-query:
        image: jaegertracing/jaeger-query
        container_name: jaeger-query
        networks:
          - backend
        command: [
          "--es.server-urls=http://elasticsearch:9200",
          "--span-storage.type=elasticsearch",
          "--log-level=info"
        ]
        environment:
          - SPAN_STORAGE_TYPE=elasticsearch
          - METRICS_STORAGE_TYPE=prometheus
          - PROMETHEUS_SERVER_URL=http://prometheus:9090
          - no_proxy=localhost
          - QUERY_BASE_PATH=/jaeger
        depends_on:
          - jaeger-agent
      jaeger-agent:
        image: jaegertracing/jaeger-agent
        container_name: jaeger-agent
        networks:
          - backend
        command: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]
        environment:
          - SPAN_STORAGE_TYPE=elasticsearch
          - METRICS_STORAGE_TYPE=prometheus
          - PROMETHEUS_SERVER_URL=http://prometheus:9090
        depends_on:
          - jaeger-collector
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    明日待办

    1. 压测
  • 相关阅读:
    【问题思考】为什么SCAN CSCAN会导致磁臂黏着而FCFS不会导致磁臂黏着?
    前端项目中,强缓存和协商缓存的配置
    Leetcode 142. 环形链表 II
    PyTorch 迭代器读取数据
    【javaEE】网络原理(传输层Part2)
    过滤器和监听器
    could not read ok from ADB Server
    idea修改颜色
    BGP边界网关路由实验(华为)
    TCP三次握手四次分手不一样的解释
  • 原文地址:https://blog.csdn.net/xzx18822942899/article/details/133259500