分布式缓存
# 简述
在一个高性能的系统中,缓存是重中之重,只有减少数据库的访问,才能承接更大的流量,也才能避免数据库被打崩,而在分布式或集群的系统中,则只能使用分布式缓存,联犀对此进行了深度封装,便于各业务的使用.
# 原理
# 缓存获取
分布式缓存分为两个部分,一个是服务内的缓存,一个是给其他服务使用的缓存.
服务内的缓存由于数据是自己掌控,所以获取数据缓存的顺序为 内存->redis->数据库
给其他服务使用的缓存则为 内存->redis-> rpc调用数据所有服务来获取数据并更新到内存和redis中
# 服务内
# 服务外
在内存和redis中都有一个过期时间,redis上的过期时间会比内存稍长,避免同时缓存同时到期都去请求redis.
同时,借助gozero的 syncx.SingleFlight
同一个服务下同时获取相同数据的时候会进行加锁操作,获取完成后会返回给每一个业务.
代码如下:
func (c *Cache[dataT, keyType]) GetData(ctx context.Context, key keyType) (*dataT, error) {
ctx = ctxs.WithRoot(ctx)
cacheKey := c.genCacheKey(key)
temp, ok := c.cache.Get(cacheKey)
if ok {
if temp == nil {
return nil, errors.NotFind
}
return temp, nil
}
//并发获取的情况下避免击穿
ret, err := c.sf.Do(cacheKey, func() (any, error) {
{ //内存中没有就从redis上获取
val, err := store.GetCtx(ctx, cacheKey)
if err != nil {
return nil, err
}
if len(val) > 0 {
var ret dataT
err = json.Unmarshal([]byte(val), &ret)
if err != nil {
return nil, err
}
if c.fmt != nil {
c.fmt(ctx, key, &ret)
}
c.cache.Set(cacheKey, &ret)
return &ret, nil
}
}
if c.getData == nil { //如果没有设置第三级缓存则直接设置该参数为空并返回
c.cache.Set(cacheKey, nil)
return nil, nil
}
//redis上没有就读数据库
data, err := c.getData(ctxs.WithRoot(ctx), key)
if err != nil && !errors.Cmp(err, errors.NotFind) { //如果是其他错误直接返回
return nil, err
}
//读到了之后设置缓存
c.cache.Set(cacheKey, data)
if data == nil {
return data, err
}
ctxs.GoNewCtx(ctx, func(ctx context.Context) { //异步设置缓存
str, err := json.Marshal(data)
if err != nil {
logx.WithContext(ctx).Error(err)
return
}
_, err = store.SetnxExCtx(ctx, cacheKey, string(str), int(c.expireTime/time.Second))
if err != nil {
logx.WithContext(ctx).Error(err)
return
}
})
return data, nil
})
if err != nil {
return nil, err
}
return ret.(*dataT), nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 缓存更新
- 缓存加载方式: 在联犀中,缓存默认使用懒加载的方式,也就是说如果缓存更新了只会进行删除操作,而不会去更新缓存.这样的优点是资源消耗少,但是如果突然出现大并发的流量,这种方式就有可能把数据库打爆.但是恰好物联网行业的流量变化是相对稳定的,不会出现这种突发流量变化.
- 数据不一致: 由于缓存使用了内存缓存,正常情况下来说更新缓存会导致其他服务的内存和实际数据出现数据不一致的情况,在联犀中为了解决这个问题使用消息队列通知的方式来处理, 其他服务收到数据更新的通知后会将内存中的缓存清除.
- 异常情况: 如果出现网络问题导致一段时间的不可用,恢复了之后通知的消息未收到会导致内存中的缓存和实际的数据不一致,这时候就需要等待内存中的数据过期,再去拿新的数据,保证最终一致性.
设置代码示例如下:
// 删除数据的时候设置为空即可
func (c *Cache[dataT, keyType]) SetData(ctx context.Context, key keyType, data *dataT) error {
cacheKey := c.genCacheKey(key)
if data != nil { //如果是
dataStr, err := json.Marshal(data)
if err != nil {
logx.WithContext(ctx).Error(err)
return err
}
err = store.SetexCtx(ctx, cacheKey, string(dataStr), int(c.expireTime/time.Second))
if err != nil {
logx.WithContext(ctx).Error(err)
}
} else {
_, err := store.Del(cacheKey)
if err != nil {
logx.WithContext(ctx).Error(err)
}
}
c.cache.Delete(cacheKey)
if c.fastEvent != nil {//发布缓存更新消息
err := c.fastEvent.Publish(ctx, c.genTopic(), cacheKey)
if err != nil {
logx.WithContext(ctx).Error(err)
}
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
在初始化缓存的时候,缓存SDK同时也会监听对应的缓存更新topic,这样对于业务来说初始化了之后就和使用本地缓存是一样的了.
定义和初始化代码:
type CacheSyncStu struct {
KeyType string `json:"keyType"`
}
type Cache[dataT any, keyType comparable] struct {
keyType string
cache otter.Cache[string, *dataT]
fastEvent *eventBus.FastEvent
getData func(ctx context.Context, key keyType) (*dataT, error)
fmt func(ctx context.Context, key keyType, data *dataT)
expireTime time.Duration
sf syncx.SingleFlight
}
type CacheConfig[dataT any, keyType comparable] struct {
KeyType string
FastEvent *eventBus.FastEvent
Fmt func(ctx context.Context, key keyType, data *dataT)
GetData func(ctx context.Context, key keyType) (*dataT, error)
ExpireTime time.Duration
}
var (
cacheMap = map[string]any{}
cacheMutex sync.Mutex
)
func NewCache[dataT any, keyType comparable](cfg CacheConfig[dataT, keyType]) (*Cache[dataT, keyType], error) {
cacheMutex.Lock() //单例模式
defer cacheMutex.Unlock()
if v, ok := cacheMap[cfg.KeyType]; ok {
return v.(*Cache[dataT, keyType]), nil
}
cache, err := otter.MustBuilder[string, *dataT](10_000).
CollectStats().
Cost(func(key string, value *dataT) uint32 {
return 1
}).
WithTTL(cfg.ExpireTime/3 + 1).
Build()
if err != nil {
return nil, err
}
ret := Cache[dataT, keyType]{
sf: syncx.NewSingleFlight(),
keyType: cfg.KeyType,
cache: cache,
fastEvent: cfg.FastEvent,
getData: cfg.GetData,
expireTime: cfg.ExpireTime,
fmt: cfg.Fmt,
}
if ret.expireTime == 0 {
ret.expireTime = time.Minute*10 + time.Second*time.Duration(rand.Int63n(60))
}
if ret.fastEvent != nil {
err = ret.fastEvent.Subscribe(ret.genTopic(), func(ctx context.Context, t time.Time, body []byte) error {
cacheKey := string(body)
ret.cache.Delete(cacheKey)
return nil
})
if err != nil {
return nil, err
}
}
cacheMap[cfg.KeyType] = &ret
return &ret, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 定义方法
分为服务内缓存和给服务外调用的缓存
# 服务内
- 定义
首先在 core/service/syssvr/internal/svc/serviceContext.go 的 ServiceContext
结构体中定义好我们的缓存:
UserCache *caches.Cache[sys.UserInfo, int64]
第一个参数是缓存的值,第二个参数是缓存的key,可以也可以是结构体
- 初始化:
userCache, err := caches.NewCache(caches.CacheConfig[sys.UserInfo, int64]{
KeyType: eventBus.ServerCacheKeySysUserInfo,
FastEvent: svcCtx.FastEvent,
GetData: func(ctx context.Context, key int64) (*sys.UserInfo, error) {
db := relationDB.NewUserInfoRepo(ctx)
if key == 0 {
key = ctxs.GetUserCtxNoNil(ctx).UserID
}
pi, err := db.FindOne(ctx, key)
pb := usermanagelogic.UserInfoToPb(ctx, pi, svcCtx)
return pb, err
},
ExpireTime: 20 * time.Minute,
})
logx.Must(err)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
keyType:是缓存的key的类型,同一个缓存需要一致,拿来生成缓存的key
FastEvent:是联犀内置的消息队列,负责数据修改通知
GetData: 当redis也没有的时候,定义这个函数,缓存就知道去哪里拿了,rpc或数据库
# 服务外
type TenantCacheT = *caches.Cache[tenant.Info, string]
func NewTenantInfoCache(pm tenantmanage.TenantManage, fastEvent *eventBus.FastEvent) (TenantCacheT, error) {
return caches.NewCache(caches.CacheConfig[tenant.Info, string]{
KeyType: eventBus.ServerCacheKeySysTenantInfo,
FastEvent: fastEvent,
GetData: func(ctx context.Context, key string) (*tenant.Info, error) {
ret, err := pm.TenantInfoRead(ctx, &sys.WithIDCode{Code: key})
return logic.RpcToTenantInfoCache(ret), err
},
})
}
2
3
4
5
6
7
8
9
10
11
12
keyType要和服务内定义的相同,GetData为调用该服务的rpc
定义位置参考:
# 使用缓存
注意: 在使用前,需要先调用缓存初始化才能正常运行 share/caches/common.go:14
缓存暴露在外的只有两个方法:
- SetData: 设置缓存,如果data为空则为清空缓存
- GetData: 获取缓存
func (c *Cache[dataT, keyType]) SetData(ctx context.Context, key keyType, data *dataT) error
func (c *Cache[dataT, keyType]) GetData(ctx context.Context, key keyType) (*dataT, error)
2
使用示例:
ui, err := l.svcCtx.UserC.GetData(l.ctx, v.UserID)
if err == nil {
user = utils.Copy[types.UserCore](ui)
}
2
3
4
更新缓存示例:
err = l.svcCtx.UserCache.SetData(l.ctx, ui.UserID, nil)
if err != nil {
l.Error(err)
}
2
3
4
# 总结
可以看到使用联犀的分布式缓存和使用本地的缓存几乎没有任何区别,同时因为一级缓存在内存中,所以使用者可以放心大胆的高频使用,而无需担心频繁调用导致系统崩溃的问题.