全链路追踪
在一个完善的系统中,链路追踪是必不可少的,go-zero 当然早已经为我们考虑好了,只需要在配置中添加配置即可使用。gozero默认会在 api 的中间件与 rpc 的 interceptor 添加追踪,但是在消息队列,数据库日志打印中仍需要我们自己来集成,本文就讲解联犀在全链路追踪中做的工作.
# gorm链路追踪
在gorm中,支持日志自定义,只需要实现gorm定义的接口,创建的时候配置进去,gorm即会调用.
接口定义:
type Interface interface {
LogMode(LogLevel) Interface
Info(context.Context, string, ...interface{})
Warn(context.Context, string, ...interface{})
Error(context.Context, string, ...interface{})
Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error)
}
2
3
4
5
6
7
核心代码实现如下:
func (l *Log) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
if l.LogLevel <= logger.Silent {
return
}
elapsed := time.Since(begin)
useTime := fmt.Sprintf("%v ms", float64(elapsed.Nanoseconds())/1e6)
switch {
case err != nil && l.LogLevel >= logger.Error && (!errors.Is(err, logger.ErrRecordNotFound) || !IgnoreRecordNotFoundError):
sql, rows := fc()
logx.WithContext(ctx).Errorw("errorSql", logx.Field("call", utils.FileWithLineNum()), logx.Field("sql", sql),
logx.Field("useTime", useTime), logx.Field("rows", rows), logx.Field("err", err))
case elapsed > SlowThreshold && SlowThreshold != 0 && l.LogLevel >= logger.Warn:
sql, rows := fc()
slowLog := fmt.Sprintf("slowSql >= %v", SlowThreshold)
logx.WithContext(ctx).Sloww(slowLog, logx.Field("call", utils.FileWithLineNum()), logx.Field("sql", sql),
logx.Field("useTime", useTime), logx.Field("rows", rows))
case l.LogLevel == logger.Info:
sql, rows := fc()
logx.WithContext(ctx).Infow("traceSql", logx.Field("call", utils.FileWithLineNum()), logx.Field("sql", sql),
logx.Field("useTime", useTime), logx.Field("rows", rows))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
需要关注的是Trace函数,sql打印都是这个函数去做的,也方便我们进行调试
gozero集成的日志打印支持自定义字段(logx.LogField),把关注的核心点,如整个执行时间,影响的行数及sql进行打印
同时我们其实更关注的一点是这个sql是在业务的哪里进行调用的,所以我们根据调用堆栈,获取gorm外第一个调用的地方,然后打印出该行
函数实现:
// FileWithLineNum return the file name and line number of the current file
func FileWithLineNum() string {
pcs := [13]uintptr{}
// the third caller usually from gorm internal
len := runtime.Callers(3, pcs[:])
frames := runtime.CallersFrames(pcs[:len])
for i := 0; i < len; i++ {
// second return value is "more", not "ok"
frame, _ := frames.Next()
if (!strings.HasPrefix(frame.File, sDIr) ||
strings.HasSuffix(frame.File, "_test.go")) && !strings.HasPrefix(frame.Function, "gorm.io") && !strings.HasSuffix(frame.File, ".gen.go") {
return prettyCaller(frame.File, frame.Line)
}
}
return ""
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
最后效果展示:
2024-11-04T11:40:28.082+08:00 info traceSql rows=3 caller=stores/logger.go:84 call=logic/data/staticstics/info/readLogic.go:260 sql=SELECT `id`,`device_count` FROM `dm_product_category` WHERE deleted_time= 0 AND `id` in ('29','26','27') ORDER BY created_time desc useTime=1.146806 ms trace=495c1c9baea1e5ceda757119a568975d span=d1d9c3798b192744
2
# tdengine链路追踪
# 消息队列链路追踪
在联犀中,所有消息队列的消息都需要经过统一的封装才能发送,封装的消息结构体如下
type (
// MsgHead 消息队列的头
MsgHead struct {
Trace string `json:"trace"` //追踪tid
Timestamp int64 `json:"timestamp"` //发送时毫秒级时间戳
Data string `json:"data,omitempty"` //传送的内容
UserCtx *ctxs.UserCtx `json:"userCtx,omitempty"`////context中携带的上下文,如用户信息,租户信息等
}
EventHandle interface {
GetCtx() context.Context
GetTs() time.Time
GetData() []byte
}
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在发送消息的时候,由于gozero链路追踪集成的是otel,所以在创建消息的时候我们通过otel的接口来将ctx中的链路信息获取出来,放到我们消息队列的消息体中,代码实现如下:
func NewEventMsg(ctx context.Context, data []byte) []byte {
//生成新的消息时,使用go-zero的链路追踪接口,从ctx中提取span信息,并放入MsgHead中的Trace字段
span := trace.SpanFromContext(ctx)
traceinfo, _ := span.SpanContext().MarshalJSON()
msg := MsgHead{
Trace: string(traceinfo),
Timestamp: time.Now().UnixMilli(),
Data: string(data),
UserCtx: ctxs.GetUserCtx(ctx).ClearInner(),
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return nil
}
return msgBytes
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
在接收消息的时候则需要将上下文进行回复,链路追踪及用户信息等
func (m *MsgHead) GetCtx() context.Context {
var msg MySpanContextConfig
err := json.Unmarshal([]byte(m.Trace), &msg)
if err != nil {
logx.Errorf("[GetCtx]|json Unmarshal trace.SpanContextConfig err:%v", err)
return nil
}
//将MsgHead 中的msg链路信息 重新注入ctx中并返回
t, err := trace.TraceIDFromHex(msg.TraceID)
if err != nil {
logx.Errorf("[GetCtx]|TraceIDFromHex err:%v", err)
return nil
}
s, err := trace.SpanIDFromHex(msg.SpanID)
if err != nil {
logx.Errorf("[GetCtx]|SpanIDFromHex err:%v", err)
return nil
}
parent := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: t,
SpanID: s,
TraceFlags: 0x1,
})
ctx := trace.ContextWithRemoteSpanContext(context.Background(), parent)
return ctxs.SetUserCtx(ctx, m.UserCtx)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
当然,在实际使用的过程中只需要知道这个流程即可,联犀已经将消息队列进行封装,直接使用即可,代码位置为: share/eventBus/fastEvent.go
使用示例为:
err = svcCtx.FastEvent.QueueSubscribe(eventBus.DmDeviceOnlineStatusChange, func(ctx context.Context, t time.Time, body []byte) error {
if t.Before(time.Now().Add(-time.Second * 2)) { //2秒之前的跳过
return nil
}
return serverEvent.NewServerHandle(ctxs.WithRoot(ctx), svcCtx).OnlineStatusHandle()
})
logx.Must(err)
2
3
4
5
6
7
# 设备消息链路追踪
在联犀中,设备上报及下发都有一个msgToken的字段,云端下发会带这个字段,同时也建议设备端所有请求都带上,可以是个随机字符串,也可以是自增的数字,保证和设备交互的整个流程中保持唯一即可.
同时在mqtt接收到设备消息的时候会第一时间把链路ID生成并注入到ctx中,保证通过查询msgToken能获取全链路ID,同时,在云端操作日志中也可以看到该链路ID,在联犀的日志上下文中会打印,并在日志中记录链路ID

func (d *MqttClient) subscribeWithFunc(cli mqtt.Client, topic string, handle func(ctx context.Context, topic string, payload []byte) error) error {
return d.client.Subscribe(cli, topic,
1, func(client mqtt.Client, message mqtt.Message) {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
defer cancel()
utils.Recover(ctx)
//dgsvr 订阅到了设备端数据,此时调用StartSpan方法,将订阅到的主题推送给jaeger
//此时的ctx已经包含当前节点的span信息,会随着 handle(ctx).Publish 传递到下个节点
ctx, span := ctxs.StartSpan(ctx, message.Topic(), "")
defer span.End()
startTime := timex.Now()
duration := timex.Since(startTime)
ctx = ctxs.WithRoot(ctx)
err := handle(ctx, message.Topic(), message.Payload())
if err != nil {
logx.WithContext(ctx).Errorf("%s.handle failure err:%v topic:%v", utils.FuncName(), err, topic)
}
logx.WithContext(ctx).WithDuration(duration).Infof(
"subscribeWithFuncIThingsDevicePublish topic:%v message:%v err:%v",
message.Topic(), string(message.Payload()), err)
}()
})
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 附录
- 数据库初始化: share/stores/conn.go
- gorm链路日志打印实现: share/stores/logger.go
- gozero链路追踪: https://mp.weixin.qq.com/s/xmSar4aG_HVuPvAi6auQOQ (opens new window)