联犀 联犀
首页
使用指南
开发指南
技术分享
  • 中台接口 (opens new window)
  • 物联网接口 (opens new window)
在线体验 (opens new window)
  • gitee (opens new window)
  • github (opens new window)
  • 边缘网关-RHILEX (opens new window)
  • 边缘网关-opengw (opens new window)
首页
使用指南
开发指南
技术分享
  • 中台接口 (opens new window)
  • 物联网接口 (opens new window)
在线体验 (opens new window)
  • gitee (opens new window)
  • github (opens new window)
  • 边缘网关-RHILEX (opens new window)
  • 边缘网关-opengw (opens new window)
  • 后端

    • 联犀是如何架构好一个既要又要的系统
    • 插槽设计及使用
    • 如何实现微服务通用websocket
    • 多租户实现
    • tdengine结合物联网平台落地
    • 全链路追踪
      • gorm链路追踪
      • tdengine链路追踪
      • 消息队列链路追踪
      • 设备消息链路追踪
      • 附录
    • 缩小可执行文件的大小
    • 生成功能授权
  • 技术分享
  • 后端
godLei6
2024-11-04
目录

全链路追踪

在一个完善的系统中,链路追踪是必不可少的,go-zero 当然早已经为我们考虑好了,只需要在配置中添加配置即可使用。gozero默认会在 api 的中间件与 rpc 的 interceptor 添加追踪,但是在消息队列,数据库日志打印中仍需要我们自己来集成,本文就讲解联犀在全链路追踪中做的工作.

# gorm链路追踪

在gorm中,支持日志自定义,只需要实现gorm定义的接口,创建的时候配置进去,gorm即会调用.

接口定义:

type Interface interface {
    LogMode(LogLevel) Interface
    Info(context.Context, string, ...interface{})
    Warn(context.Context, string, ...interface{})
    Error(context.Context, string, ...interface{})
    Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error)
}
1
2
3
4
5
6
7

核心代码实现如下:


func (l *Log) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
	if l.LogLevel <= logger.Silent {
		return
	}
	elapsed := time.Since(begin)
	useTime := fmt.Sprintf("%v ms", float64(elapsed.Nanoseconds())/1e6)
	switch {
	case err != nil && l.LogLevel >= logger.Error && (!errors.Is(err, logger.ErrRecordNotFound) || !IgnoreRecordNotFoundError):
		sql, rows := fc()
		logx.WithContext(ctx).Errorw("errorSql", logx.Field("call", utils.FileWithLineNum()), logx.Field("sql", sql),
			logx.Field("useTime", useTime), logx.Field("rows", rows), logx.Field("err", err))
	case elapsed > SlowThreshold && SlowThreshold != 0 && l.LogLevel >= logger.Warn:
		sql, rows := fc()
		slowLog := fmt.Sprintf("slowSql >= %v", SlowThreshold)
		logx.WithContext(ctx).Sloww(slowLog, logx.Field("call", utils.FileWithLineNum()), logx.Field("sql", sql),
			logx.Field("useTime", useTime), logx.Field("rows", rows))
	case l.LogLevel == logger.Info:
		sql, rows := fc()
		logx.WithContext(ctx).Infow("traceSql", logx.Field("call", utils.FileWithLineNum()), logx.Field("sql", sql),
			logx.Field("useTime", useTime), logx.Field("rows", rows))
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

需要关注的是Trace函数,sql打印都是这个函数去做的,也方便我们进行调试

gozero集成的日志打印支持自定义字段(logx.LogField),把关注的核心点,如整个执行时间,影响的行数及sql进行打印

同时我们其实更关注的一点是这个sql是在业务的哪里进行调用的,所以我们根据调用堆栈,获取gorm外第一个调用的地方,然后打印出该行

函数实现:

// FileWithLineNum return the file name and line number of the current file
func FileWithLineNum() string {
	pcs := [13]uintptr{}
	// the third caller usually from gorm internal
	len := runtime.Callers(3, pcs[:])
	frames := runtime.CallersFrames(pcs[:len])
	for i := 0; i < len; i++ {
		// second return value is "more", not "ok"
		frame, _ := frames.Next()
		if (!strings.HasPrefix(frame.File, sDIr) ||
			strings.HasSuffix(frame.File, "_test.go")) && !strings.HasPrefix(frame.Function, "gorm.io") && !strings.HasSuffix(frame.File, ".gen.go") {
			return prettyCaller(frame.File, frame.Line)
		}
	}

	return ""
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

最后效果展示:

2024-11-04T11:40:28.082+08:00	 info 	traceSql	rows=3	caller=stores/logger.go:84	call=logic/data/staticstics/info/readLogic.go:260  sql=SELECT `id`,`device_count` FROM `dm_product_category` WHERE deleted_time= 0 AND `id` in ('29','26','27') ORDER BY created_time desc	useTime=1.146806 ms	trace=495c1c9baea1e5ceda757119a568975d	span=d1d9c3798b192744

1
2

# tdengine链路追踪

参考: tdengine结合物联网平台落地

# 消息队列链路追踪

在联犀中,所有消息队列的消息都需要经过统一的封装才能发送,封装的消息结构体如下

type (
	// MsgHead 消息队列的头
	MsgHead struct {
		Trace     string        `json:"trace"`          //追踪tid
		Timestamp int64         `json:"timestamp"`      //发送时毫秒级时间戳
		Data      string        `json:"data,omitempty"` //传送的内容
		UserCtx   *ctxs.UserCtx `json:"userCtx,omitempty"`////context中携带的上下文,如用户信息,租户信息等
	}

	EventHandle interface {
		GetCtx() context.Context
		GetTs() time.Time
		GetData() []byte
	}
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

在发送消息的时候,由于gozero链路追踪集成的是otel,所以在创建消息的时候我们通过otel的接口来将ctx中的链路信息获取出来,放到我们消息队列的消息体中,代码实现如下:

func NewEventMsg(ctx context.Context, data []byte) []byte {
	//生成新的消息时,使用go-zero的链路追踪接口,从ctx中提取span信息,并放入MsgHead中的Trace字段
	span := trace.SpanFromContext(ctx)
	traceinfo, _ := span.SpanContext().MarshalJSON()

	msg := MsgHead{
		Trace:     string(traceinfo),
		Timestamp: time.Now().UnixMilli(),
		Data:      string(data),
		UserCtx:   ctxs.GetUserCtx(ctx).ClearInner(),
	}
	msgBytes, err := json.Marshal(msg)
	if err != nil {
		return nil
	}
	return msgBytes
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

在接收消息的时候则需要将上下文进行回复,链路追踪及用户信息等

func (m *MsgHead) GetCtx() context.Context {
	var msg MySpanContextConfig
	err := json.Unmarshal([]byte(m.Trace), &msg)
	if err != nil {
		logx.Errorf("[GetCtx]|json Unmarshal trace.SpanContextConfig err:%v", err)
		return nil
	}
	//将MsgHead 中的msg链路信息 重新注入ctx中并返回
	t, err := trace.TraceIDFromHex(msg.TraceID)
	if err != nil {
		logx.Errorf("[GetCtx]|TraceIDFromHex err:%v", err)
		return nil
	}
	s, err := trace.SpanIDFromHex(msg.SpanID)
	if err != nil {
		logx.Errorf("[GetCtx]|SpanIDFromHex err:%v", err)
		return nil
	}
	parent := trace.NewSpanContext(trace.SpanContextConfig{
		TraceID:    t,
		SpanID:     s,
		TraceFlags: 0x1,
	})
	ctx := trace.ContextWithRemoteSpanContext(context.Background(), parent)
	return ctxs.SetUserCtx(ctx, m.UserCtx)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

当然,在实际使用的过程中只需要知道这个流程即可,联犀已经将消息队列进行封装,直接使用即可,代码位置为: share/eventBus/fastEvent.go

使用示例为:

err = svcCtx.FastEvent.QueueSubscribe(eventBus.DmDeviceOnlineStatusChange, func(ctx context.Context, t time.Time, body []byte) error {
    if t.Before(time.Now().Add(-time.Second * 2)) { //2秒之前的跳过
        return nil
    }
    return serverEvent.NewServerHandle(ctxs.WithRoot(ctx), svcCtx).OnlineStatusHandle()
})
logx.Must(err)
1
2
3
4
5
6
7

# 设备消息链路追踪

在联犀中,设备上报及下发都有一个msgToken的字段,云端下发会带这个字段,同时也建议设备端所有请求都带上,可以是个随机字符串,也可以是自增的数字,保证和设备交互的整个流程中保持唯一即可.

同时在mqtt接收到设备消息的时候会第一时间把链路ID生成并注入到ctx中,保证通过查询msgToken能获取全链路ID,同时,在云端操作日志中也可以看到该链路ID,在联犀的日志上下文中会打印,并在日志中记录链路ID

func (d *MqttClient) subscribeWithFunc(cli mqtt.Client, topic string, handle func(ctx context.Context, topic string, payload []byte) error) error {
	return d.client.Subscribe(cli, topic,
		1, func(client mqtt.Client, message mqtt.Message) {
			go func() {
				ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
				defer cancel()
				utils.Recover(ctx)
				//dgsvr 订阅到了设备端数据,此时调用StartSpan方法,将订阅到的主题推送给jaeger
				//此时的ctx已经包含当前节点的span信息,会随着 handle(ctx).Publish 传递到下个节点
				ctx, span := ctxs.StartSpan(ctx, message.Topic(), "")
				defer span.End()
				startTime := timex.Now()
				duration := timex.Since(startTime)
				ctx = ctxs.WithRoot(ctx)
				err := handle(ctx, message.Topic(), message.Payload())
				if err != nil {
					logx.WithContext(ctx).Errorf("%s.handle failure err:%v topic:%v", utils.FuncName(), err, topic)
				}
				logx.WithContext(ctx).WithDuration(duration).Infof(
					"subscribeWithFuncIThingsDevicePublish topic:%v message:%v err:%v",
					message.Topic(), string(message.Payload()), err)
			}()
		})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 附录

  • 数据库初始化: share/stores/conn.go
  • gorm链路日志打印实现: share/stores/logger.go
  • gozero链路追踪: https://mp.weixin.qq.com/s/xmSar4aG_HVuPvAi6auQOQ (opens new window)
上次更新: 2024/11/04, 16:05:56
tdengine结合物联网平台落地
缩小可执行文件的大小

← tdengine结合物联网平台落地 缩小可执行文件的大小→

Theme by Vdoing | Copyright © 2022-2025 昆明云物通科技有限公司|GNU | 滇ICP备2024043132号-1 |
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式