如何实现微服务通用websocket
约 925 字大约 3 分钟
2025-03-03
简述
在互联网中,web和后端交互的方式只有http和websocket,但是有些问题http处理起来不是特别好.
使用websocket市面上主要有两个原因:
- 在弱网环境中ws的响应速度和稳定性要优于http,于是部分公司完全放弃http,转而用ws来进行数据交互
- 需要进行实时推送(如设备状态变化),如果使用http轮询开支会比较大
websocket很灵活,但是带来的开发成本也比较高,特别由于ws是长连接,那么在微服务场景下会带来以下几个问题:
- 在微服务领域中,负载均衡是一个非常优秀的特性,但是由于websocket的长连接,负载均衡的问题就不是很好处理
- 随着业务的增长,每个业务都会产生websocket的需求,如果每个业务都需要单独处理websocket,一个是会带来业务的复杂性,一个是连接的复用没法实现
联犀在实际生产应用中,为了优雅而高效的解决以上两个问题,使用拓展插槽和消息队列的方式完美的解决了以上两个问题,并且新业务开发的难度也直线降低,下面就讲述一下实现方式和原理
架构

订阅流程


同时当用户订阅一个服务的消息的时候我们需要进行鉴权, 这时候core 是不知道他能不能订阅的,如设备的状态推送,这种消息只有项目内及设备的分享者才能有权限进行订阅.
用户订阅的固定参数中有一个code参数,我们可以借助插槽机制来调用code记录的对应插槽接口进行鉴权,如果业务认为该用户有权限,core服务就会将订阅的消息放到订阅列表中,后续推送消息如果匹配就会推送给该用户.
插槽定义

订阅鉴权代码示例
示例代码位置: things/service/apisvr/internal/logic/things/slot/user/subscribeLogic.go
func (l *SubscribeLogic) Subscribe(req *types.SlotUserSubscribeReq) error {
l.Infof("userSubscribeSlot:%v", utils.Fmt(req))
switch req.Code {
case def.UserSubscribeDeviceConn:
case def.UserSubscribeDevicePropertyReport:
case def.UserSubscribeDeviceActionReport:
case def.UserSubscribeDeviceOtaReport:
default:
return errors.NotRealize
}
var productID = cast.ToString(req.Params["productID"])
var deviceName = cast.ToString(req.Params["deviceName"])
var projectID = cast.ToInt64(req.Params["projectID"])
var areaID = cast.ToInt64(req.Params["areaID"])
if productID != "" && deviceName != "" { //设备有权限查就能订阅
_, err := l.svcCtx.DeviceM.DeviceInfoRead(l.ctx, &dm.DeviceInfoReadReq{
ProductID: productID,
DeviceName: deviceName,
})
return err
}
var uc = ctxs.GetUserCtx(l.ctx)
if uc == nil {
return errors.Permissions.AddMsg("只有用户才能订阅")
}
if projectID == 0 {
return errors.Permissions
}
if uc.IsAdmin {
return nil
}
pa := uc.ProjectAuth[projectID]
if pa == nil {
return errors.Permissions
}
if pa.AuthType == def.AuthAdmin || pa.AuthType == def.AuthReadWrite {
return nil
}
if areaID == 0 {
return errors.Permissions
}
aa := pa.Area[areaID]
if aa == 0 {
return errors.Permissions
}
return nil
}
推送流程
推送流程就比较简单,业务调用联犀的ws发布函数直接发布消息即可,如果参数对应上的就会发送给业务
示例位置: things/service/dmsvr/internal/event/deviceMsgEvent/thing.go:216
发布函数位置: share/websocket/userSubscribe.go

总结
至此我们就可以通过简单的几步就能够快速的将业务应用接入到实时推送的能力中,欢迎大家进行使用及测试.
附录
websocket管理的包: share/websocket 插槽使用说明