如何开发我们的协议网关
约 1800 字大约 6 分钟
2025-03-31
前言
当我们需要使用协议网关来开发的时候,我们需要尽量做的通用,把定制化的部分尽量放到协议脚本中去实现.
联犀为协议网关封装了一套sdk,用户可以通过这套sdk快速的开发自己的协议网关.
sdk的位置: things/sdk/protocol
sdk下的文件内容:
.
└── protocol
├── cloudProtocol.go //基于core封装添加了配置管理,适用与云云对接
├── coreProtocol.go //协议网关核心封装,将物模型,产品,设备,及初始化都封装到这里,特殊协议直接基于该方法进行开发即可,如modbus,tcp,udp等
├── default.go // 配置管理示例配置结构体,用户根据这个示例来实现自己的配置管理
├── mqttProtocol.go //针对mqtt协议进行封装,添加了在线校准,上下线订阅
└── readme.md
开发
代码初始化
我们根据协议的不同创建不同的服务类型.
rpc服务(mqtt协议)
goctl rpc new rpcsvr --style=goZero -m
初始化后需要用下面的模版替换
syntax = "proto3";
package dg;//不要改
option go_package="pb/dg"; //这里修改成协议名称即可如pb/pwumei
message Request {
string ping = 1;
}
message Response {
string pong = 1;
}
service DeviceAuth{
//设备登录认证
rpc loginAuth(LoginAuthReq) returns (Response);
//设备操作认证
rpc accessAuth(AccessAuthReq) returns (Response);
//设备动态注册
rpc deviceRegister(DeviceRegisterReq) returns (DeviceRegisterResp);
}
message DeviceRegisterReq{
string productID = 1; //产品id
string deviceName = 2; //设备名称
int64 nonce = 3; //随机数
int64 timestamp = 4; //秒级时间戳
string signature = 5; //签名信息
}
message DeviceRegisterResp{
int64 len = 1; //payload加密前信息的长度
/*
加密过程将原始 JSON 格式的 payload 转为字符串后进行 AES 加密,再进行 base64 加密。AES 加密算法为 CBC 模式,密钥长度128,取 productSecret 前16位,偏移量为长度16的字符“0”。
原始 payload 内容说明:
key value 描述
encryptionType 1 加密类型,1表示证书认证,2表示签名认证。
psk 1239 设备密钥,当产品认证类型为签名认证时有此参数
clientCert - 设备证书文件字符串格式,当产品认证类型为证书认证时有此参数。
clientKey - 设备私钥文件字符串格式,当产品认证类型为证书认证时有此参数。
*/
string payload = 2;
}
message LoginAuthReq {
string protocolType =1; // 协议类型: iThings,iThings-thingsboard
string username = 2;//用户名
string password = 3;//密码
string clientID = 4;//clientID
string ip = 5;//访问的ip地址
bytes certificate = 6;//客户端证书
}
message AccessAuthReq {
string protocolType =1; // 协议类型: iThings,iThings-thingsboard
string username = 2; //用户名
string topic = 3;//主题
string clientID = 4;//clientID
string access = 5;//操作
string ip = 6; //访问的ip地址
}
api服务(对外需要提供http接口)
goctl api new apisvr --style=goZero
protocol初始化
在svc中导入protocol,下面是物美协议示例:
package svc
import (
"gitee.com/unitedrhino/protocol-wumei/service/pwumeisvr/internal/config"
"gitee.com/unitedrhino/share/caches"
"gitee.com/unitedrhino/share/def"
"gitee.com/unitedrhino/share/utils"
"gitee.com/unitedrhino/things/sdk/protocol"
"gitee.com/unitedrhino/things/service/dmsvr/pb/dm"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/kv"
"github.com/zeromicro/go-zero/zrpc"
)
type ServiceContext struct {
Config config.Config
Protocol *protocol.MqttProtocol //引入三种协议插件中的一种
Store kv.Store
NodeID int64
}
//产品级配置项
var ProductConfigFields = []*dm.ProtocolConfigField{
{
Id: 1,
Key: "userName",
Label: "mqtt账号",
IsRequired: true,
Sort: 1,
}, {
Id: 2,
Key: "password",
Label: "mqtt密码",
IsRequired: true,
Sort: 2,
},
}
func NewServiceContext(c config.Config) *ServiceContext {
caches.InitStore(c.CacheRedis)//初始化redis
nodeID := utils.GetNodeID(c.CacheRedis, c.Name)//初始化节点ID
//协议初始化
Protocol, err := protocol.NewMqttProtocol(c.Event, &dm.ProtocolInfo{
Name: "物美协议",
Code: def.ProtocolCodeWumei,
TransProtocol: def.ProtocolMqtt,
EtcdKey: c.Etcd.Key,
ProductFields: ProductConfigFields,
}, &protocol.CoreProtocolConf{
ServerName: c.Name,
NodeID: nodeID,
DmClient: zrpc.MustNewClient(c.DmRpc.Conf),
TimedM: zrpc.MustNewClient(c.TimedJobRpc.Conf),
}, c.DevLink)
logx.Must(err)
return &ServiceContext{
Config: c,
Store: kv.NewStore(c.CacheRedis),
Protocol: Protocol,
NodeID: nodeID,
}
}
我们启动协议网关之后,sdk会主动到联犀中注册,无需在页面手动维护
协议定义说明(按需填写即可):
type ProtocolInfo struct {
Id int64
Name string //协议名称
Code string //协议编码:唯一不重即可 iThings,iThings-thingsboard,wumei,aliyun,huaweiyun,tuya
Type string //协议类型: 普通设备(默认):normal 音视频:media
TransProtocol string // 传输协议: mqtt,tcp,udp,cloud
Desc string //协议描述
Endpoints []string
EtcdKey string
IsEnableSyncProduct int64 //是否支持主动同步产品信息,是为1 否为2,默认为2,如果开启了需要使用后面的函数来接受主动同步的消息处理 func (m *CoreProtocol) RegisterSync(c zrpc.RpcServerConf, handle protocolSync.ProtocolSyncServer)
IsEnableSyncDevice int64 //是否支持主动同步设备信息,是为1 否为2,默认为2,如果开启了需要使用后面的函数来接受主动同步的消息处理 func (m *CoreProtocol) RegisterSync(c zrpc.RpcServerConf, handle protocolSync.ProtocolSyncServer)
ConfigFields []*ProtocolConfigField //协议级配置字段列表,没有可以不传
ConfigInfos []*ProtocolConfigInfo //协议级配置信息列表
ProductFields []*ProtocolConfigField //产品需要的配置字段列表,在产品详情中的protocolConf中记录
DeviceFields []*ProtocolConfigField //设备需要的配置字段列表,在设备详情中的protocolConf中记录
}
如果我们开启了产品或设备主动同步的功能,在下面两个地方会有按钮提供调用协议组件来进行同步
- 产品同步
- 设备同步
如果我们填写了产品级配置,那么在产品详情-设备接入中会展示对应的配置
资源初始化
在internal/startup中进行一些资源的初始化.
package startup
import (
"context"
"fmt"
"gitee.com/unitedrhino/protocol-wumei/service/pwumeisvr/internal/domain"
"gitee.com/unitedrhino/protocol-wumei/service/pwumeisvr/internal/event"
"gitee.com/unitedrhino/protocol-wumei/service/pwumeisvr/internal/svc"
"gitee.com/unitedrhino/things/share/devices"
"github.com/zeromicro/go-zero/core/logx"
)
func Init(svcCtx *svc.ServiceContext) {
ProtocolInit(svcCtx)
}
func ProtocolInit(svcCtx *svc.ServiceContext) {
//注册下行消息订阅,联犀的dmsvr下发的报文都会通知到这里
err := svcCtx.Protocol.RegisterDeviceMsgDownHandler(func(ctx context.Context, info *devices.InnerPublish) error {
err := event.NewDeviceDown(ctx, svcCtx).Handle(ctx, info)
return err
})
logx.Must(err)
//订阅设备消息
err = svcCtx.Protocol.SubscribeDevMsg("+/+/+/post", func(ctx context.Context, topic string, payload []byte) error {
err := event.NewDeviceUp(ctx, svcCtx).Handle(ctx, topic, payload)
return err
})
logx.Must(err)
//订阅设备消息
err = svcCtx.Protocol.SubscribeDevMsg("/+/+/+/post", func(ctx context.Context, topic string, payload []byte) error {
err := event.NewDeviceUp(ctx, svcCtx).Handle(ctx, topic, payload)
return err
})
logx.Must(err)
//注册设备上下线信息转换,将第三方的信息转换成联犀风格的
err = svcCtx.Protocol.SubscribeDevConn(func(ctx context.Context, info devices.DevConn) (devices.DevConn, error) {
ci, err := domain.ParseClientID(info.ClientID)
if err != nil {
return info, err
}
info.DeviceName = ci.DeviceName
pi := ci.ID
if pi != "" {
info.ClientID = fmt.Sprintf("%s&%s", pi, ci.DeviceName)
info.ProductID = pi
}
return info, nil
})
logx.Must(err)
//err = svcCtx.Protocol.RegisterProductIDSync()
//logx.Must(err)
err = svcCtx.Protocol.Start()//启动协议
logx.Must(err)
svcCtx.Protocol.RunTimerHandles() //开启定时任务
}
权限校验(mqtt协议)
在生成的logic中添加操作和登录的权限校验即可
上下行消息处理
在 internal/event中创建好上行和下行的转换函数,下面几个是sdk中比较重要的函数:
//发送消息给设备
func (m *MqttProtocol) PublishToDev(ctx context.Context, topic string, payload []byte) error
//订阅对应的topic
func (m *MqttProtocol) SubscribeDevMsg(topic string, handle DevHandle) error
//上报设备的在线状态
func (p *CoreProtocol) ReportDevConn(ctx context.Context, conn devices.DevConn) (err error)
//设备发送消息给云端
func (p *CoreProtocol) DevPubMsg(ctx context.Context, publishMsg *devices.DevPublish) error
//注册云端下发设备的消息,拦截转换成对应的参数
func (p *CoreProtocol) RegisterDeviceMsgDownHandler(
handle func(ctx context.Context, info *devices.InnerPublish) error) error
结尾
上面代码都截取自物美协议,大家可以参照的进行开发,大家也可以购买企业版,企业版中所有协议都开放给企业用户