如何使用协议脚本(版本>=1.4.0)
约 3297 字大约 11 分钟
2025-03-31
前言
在我们接入各式各样的设备中会有以下几种需求很适合用我们的协议脚本来解决:
- 该产品的协议是通用的,如普通的mqtt协议,那么我们直接使用脚本来进行协议的解析即可
- 我们需要对某些数据进行计算,如上报电压电流,计算出功率,亦或是倍数的调整
- 某个设备需要进行特殊处理,那么也可以绑定某个脚本去处理
- 我们需要设备联动,也可以通过脚本执行函数去控制其他设备
联犀协议脚本的特点:
- 直接使用go来编写脚本: 基于yaegi
- 内嵌丰富的函数:把protobuf中的函数全部映射到脚本中,意味着你可以做任何事,获取设备物模型历史记录统计总和,通知第三方,控制其他设备,等等都可以做
- 高拓展能力: 支持上报预处理,下发预处理,上报后处理,下发后处理
使用流程
脚本创建
联犀支持在 上报预处理,下发预处理,上报后处理,下发后处理 这几个点进行脚本的执行,我们创建脚本的时候就需要确定好触发的位置脚本编辑与调试
脚本创建好了之后会生成一个默认的模版,可以在这里进行脚本的编辑,可以处理字符串数据,也可以处理二进制数据绑定产品或设备
脚本编辑好了之后我们可以绑定对应的产品或设备,同时设定对应的优先级,默认是优先执行产品的脚本,再执行设备的脚本
注: 脚本启用及绑定启动这个脚本才会真正的去执行
脚本开发
脚本示例:
该脚本是实现了两个物模型互换的功能
import "log"
import "context"
import "dm"
import "deviceMsg"
import "gjson"
/*
PublishMsg struct { //发布消息结构体定义
Handle string `json:"handle"` //对应 mqtt topic的第一个 thing ota config 等等
Type string `json:"type"` //操作类型 从topic中提取 物模型下就是 property属性 event事件 action行为
Payload []byte `json:"payload"` //对应mqtt的payload
Timestamp int64 `json:"timestamp"` //毫秒时间戳
ProductID string `json:"productID"` //发送设备的产品ID
DeviceName string `json:"deviceName"` //发送设备的设备ID
Explain string `json:"explain"` //内部使用的拓展字段
ProtocolCode string `json:"protocolCode"` //协议网关code
}
*/
func Handle(ctx context.Context,req *deviceMsg.PublishMsg) *deviceMsg.PublishMsg{
hc_on:=gjson.GetBytes(req.Payload,"params.hc_on")
hc_off:=gjson.GetBytes(req.Payload,"params.hc_off")
var err error
if hc_on.Type!=0&&hc_off.Type!=0 {//如果传了两个参数则互换
req.Payload,err= gjson.SetBytes(req.Payload,"params.hc_on",hc_off.Int())
if err!=nil{
log.Print(err)
}
req.Payload,err= gjson.SetBytes(req.Payload,"params.hc_off",hc_on.Int())
if err!=nil{
log.Print(err)
}
return req
}
if hc_on.Type!=0{
req.Payload,err= gjson.SetBytes(req.Payload,"params.hc_off",hc_on.Int())
if err!=nil{
log.Print(err)
}
req.Payload,err= gjson.DeleteBytes(req.Payload,"params.hc_on")
if err!=nil{
log.Print(err)
}
}
if hc_off.Type!=0{
req.Payload,err= gjson.SetBytes(req.Payload,"params.hc_on",hc_off.Int())
if err!=nil{
log.Print(err)
}
req.Payload,err= gjson.DeleteBytes(req.Payload,"params.hc_off")
if err!=nil{
log.Print(err)
}
}
return req
}
实际测试日志截图
内置函数
golang系统函数
直接import即可使用,如time,json,http等包
示例:
import "log"
import "context"
import "dm"
import "deviceMsg"
import "time"
/*
PublishMsg struct { //发布消息结构体定义
Handle string `json:"handle"` //对应 mqtt topic的第一个 thing ota config 等等
Type string `json:"type"` //操作类型 从topic中提取 物模型下就是 property属性 event事件 action行为
Payload []byte `json:"payload"` //对应mqtt的payload
Timestamp int64 `json:"timestamp"` //毫秒时间戳
ProductID string `json:"productID"` //发送设备的产品ID
DeviceName string `json:"deviceName"` //发送设备的设备ID
Explain string `json:"explain"` //内部使用的拓展字段
ProtocolCode string `json:"protocolCode"` //协议网关code
}
*/
func Handle(ctx context.Context,req *deviceMsg.PublishMsg) *deviceMsg.PublishMsg{
log.Printf("%#v",time.Now())
return req
}
日志打印结果:
time.Date(2025, time.April, 1, 11, 56, 12, 149569826, time.Local)
protobuf函数
可以参考联犀的protobuf定义,import "dm",通过dm.xxx来使用,注意函数,结构体和参数首字母需要大写
如控制设备(注,示例省略了默认的handle函数,实际使用中需要把逻辑包裹在handle中):
import "log"
import "context"
import "dm"
import "deviceMsg"
func Handle(ctx context.Context,req *deviceMsg.PublishMsg) *deviceMsg.PublishMsg{
di,err:=dm.DeviceGet(ctx,"120","862116066780827")
log.Printf("设备信息:%#v,%#v",di,err)
ret,err:=dm.DeviceInteract.PropertyControlSend(ctx,
&dm.PropertyControlSendReq{
ProductID:"120", DeviceName:"862116066780827", Data:`{"restart":0}`})
log.Printf("PropertyControlSend:%#v,%#v",ret,err)
return req
}
日志输出如下:
设备信息:&dm.DeviceInfo{state:impl.MessageState{NoUnkeyedLiterals:pragma.NoUnkeyedLiterals{}, DoNotCompare:pragma.DoNotCompare{}, DoNotCopy:pragma.DoNotCopy{}, atomicMessageInfo:(*impl.MessageInfo)(nil)}, sizeCache:0, unknownFields:[]uint8(nil), Id:63382, TenantCode:"default", ProductID:"120", ProjectID:0, AreaID:2, AreaIDPath:"2-", ProductName:"4G直连开关2", DeviceName:"862116066780827", CreatedTime:1743473415, Secret:"GPigjcsmRsr2Ut7d74uxCyTe5GM=", Cert:"", Imei:"", Mac:"", Version:(*wrapperspb.StringValue)(0xc006e1ea80), HardInfo:"", SoftInfo:"", Position:(*dm.Point)(0xc006e1eac0), Address:(*wrapperspb.StringValue)(0xc006e1eb00), Adcode:(*wrapperspb.StringValue)(nil), Tags:map[string]string(nil), IsOnline:1, FirstLogin:1743473415, FirstBind:0, LastBind:0, LastLogin:1743485845, LogLevel:1, DeviceAlias:(*wrapperspb.StringValue)(0xc006e1eb40), MobileOperator:1, Phone:(*wrapperspb.StringValue)(nil), Iccid:(*wrapperspb.StringValue)(0xc006e1eb80), SchemaAlias:map[string]string(nil), Rssi:(*wrapperspb.Int64Value)(0xc006e4b2c0), RatedPower:0, ProtocolConf:map[string]string(nil), SubProtocolConf:map[string]string(nil), Status:2, IsEnable:1, DeviceType:1, NetType:3, Distributor:(*dm.IDPathWithUpdate)(0xc0071efa90), ExpTime:(*wrapperspb.Int64Value)(nil), NeedConfirmJobID:0, NeedConfirmVersion:"", UserID:1, ProductImg:"/oss/ithings-public/dm.rpc/default/core/productManage/productImg/120/2.svg", CategoryID:2, LastIp:"223.104.55.130", Desc:(*wrapperspb.StringValue)(nil), Dept:(*dm.IDPathWithUpdate)(nil), Gateway:(*dm.DeviceInfo)(nil), Sort:100},<nil>,
PropertyControlSend:&dm.PropertyControlSendResp{state:impl.MessageState{NoUnkeyedLiterals:pragma.NoUnkeyedLiterals{}, DoNotCompare:pragma.DoNotCompare{}, DoNotCopy:pragma.DoNotCopy{}, atomicMessageInfo:(*impl.MessageInfo)(nil)}, sizeCache:0, unknownFields:[]uint8(nil), Code:200, Msg:"ok", MsgToken:"3fb"},<nil>
函数定义参考: https://gitee.com/unitedrhino/things/blob/master/service/dmsvr/proto/dm.proto
支持以下三个service:
- DeviceInteract : 设备控制
- DeviceManage : 设备管理
- ProductManage : 产品管理
- OtaManage : ota管理
部分示例如下:
//设备管理
service DeviceManage {
//鉴定是否是root账号(提供给mqtt broker)
rpc rootCheck(RootCheckReq) returns (Empty);
//新增设备
rpc deviceInfoCreate(DeviceInfo) returns (Empty);
//更新设备
rpc deviceInfoUpdate(DeviceInfo) returns (Empty);
rpc deviceOnlineMultiFix(deviceOnlineMultiFixReq)returns (Empty);
//删除设备
rpc deviceInfoDelete(DeviceInfoDeleteReq) returns (Empty);
//获取设备信息列表
rpc deviceInfoIndex(DeviceInfoIndexReq) returns (DeviceInfoIndexResp);
//批量更新设备状态
rpc DeviceInfoMultiUpdate(DeviceInfoMultiUpdateReq) returns (Empty);
//获取设备信息详情
rpc deviceInfoRead(DeviceInfoReadReq) returns (DeviceInfo);
rpc deviceInfoBind(DeviceInfoBindReq)returns(Empty);
rpc deviceBindTokenRead(DeviceBindTokenReadReq)returns(DeviceBindTokenInfo);
rpc deviceBindTokenCreate(Empty)returns(DeviceBindTokenInfo);
rpc deviceInfoMultiBind(DeviceInfoMultiBindReq)returns(DeviceInfoMultiBindResp);//批量绑定
rpc deviceInfoCanBind(DeviceInfoCanBindReq)returns(Empty);
rpc deviceInfoUnbind(DeviceInfoUnbindReq)returns(Empty);
rpc deviceTransfer(DeviceTransferReq)returns(Empty);
rpc deviceReset(DeviceResetReq)returns(Empty);//设备重置
rpc deviceMove(DeviceMoveReq)returns(Empty);//将设备信息迁移到新的设备中
rpc deviceModuleVersionRead(DeviceModuleVersionReadReq)returns (DeviceModuleVersion);
rpc deviceModuleVersionIndex(DeviceModuleVersionIndexReq)returns (DeviceModuleVersionIndexResp);
//绑定网关下子设备设备
rpc deviceGatewayMultiCreate(DeviceGatewayMultiCreateReq) returns (Empty);
//绑定网关下子设备设备
rpc deviceGatewayMultiUpdate(DeviceGatewayMultiSaveReq) returns (Empty);
//获取绑定信息的设备信息列表
rpc deviceGatewayIndex(DeviceGatewayIndexReq) returns (DeviceGatewayIndexResp);
//删除网关下子设备
rpc deviceGatewayMultiDelete(DeviceGatewayMultiSaveReq) returns (Empty);
//设备计数
rpc deviceInfoCount(DeviceInfoCountReq) returns (DeviceInfoCount);
//设备类型
rpc deviceTypeCount(DeviceTypeCountReq) returns (DeviceTypeCountResp);
rpc deviceCount(DeviceCountReq) returns (DeviceCountResp);
rpc deviceProfileRead(DeviceProfileReadReq)returns(DeviceProfile);
rpc deviceProfileDelete(DeviceProfileReadReq)returns(Empty);
rpc deviceProfileUpdate(DeviceProfile)returns(Empty);
rpc deviceProfileIndex(DeviceProfileIndexReq)returns(DeviceProfileIndexResp);
//更新设备物模型
rpc deviceSchemaUpdate(DeviceSchema)returns (Empty);
//新增设备
rpc deviceSchemaCreate(DeviceSchema) returns (Empty);
//批量新增物模型,只新增没有的,已有的不处理
rpc deviceSchemaMultiCreate(DeviceSchemaMultiCreateReq) returns (Empty);
//删除设备物模型
rpc deviceSchemaMultiDelete(DeviceSchemaMultiDeleteReq) returns (Empty);
//获取设备物模型列表
rpc deviceSchemaIndex(DeviceSchemaIndexReq) returns (DeviceSchemaIndexResp);
rpc deviceSchemaTslRead(DeviceSchemaTslReadReq) returns (DeviceSchemaTslReadResp);
//将设备加到多个分组中
rpc deviceGroupMultiCreate(DeviceGroupMultiSaveReq) returns (Empty);
//更新设备所在分组
rpc deviceGroupMultiUpdate(DeviceGroupMultiSaveReq) returns (Empty);
//删除设备所在分组
rpc deviceGroupMultiDelete(DeviceGroupMultiSaveReq) returns (Empty);
}
//产品管理
service ProductManage{
rpc productInit(ProductInitReq) returns (Empty);
/*产品管理*/
//新增产品
rpc productInfoCreate(ProductInfo) returns (Empty);
//更新产品
rpc productInfoUpdate(ProductInfo) returns (Empty);
//删除产品
rpc productInfoDelete(ProductInfoDeleteReq) returns (Empty);
//获取产品信息列表
rpc productInfoIndex(ProductInfoIndexReq) returns (ProductInfoIndexResp);
//获取产品信息详情
rpc productInfoRead(ProductInfoReadReq) returns (ProductInfo);
/*物模型管理*/
//更新产品物模型
rpc productSchemaUpdate(ProductSchemaUpdateReq)returns (Empty);
//新增产品
rpc productSchemaCreate(ProductSchemaCreateReq) returns (Empty);
//批量新增物模型,只新增没有的,已有的不处理
rpc productSchemaMultiCreate(ProductSchemaMultiCreateReq) returns (Empty);
//删除产品
rpc productSchemaDelete(ProductSchemaDeleteReq) returns (Empty);
//获取产品信息列表
rpc productSchemaIndex(ProductSchemaIndexReq) returns (ProductSchemaIndexResp);
//删除产品
rpc productSchemaTslImport(ProductSchemaTslImportReq) returns (Empty);
//获取产品信息列表
rpc productSchemaTslRead(ProductSchemaTslReadReq) returns (ProductSchemaTslReadResp);
/*脚本管理*/
rpc productCustomRead(ProductCustomReadReq) returns (ProductCustom);
rpc productCustomUpdate(ProductCustom) returns (Empty);
/*产品类型*/
//新增产品
rpc productCategoryCreate(ProductCategory) returns (WithID);
//更新产品
rpc productCategoryUpdate(ProductCategory) returns (Empty);
//删除产品
rpc productCategoryDelete(WithID) returns (Empty);
//获取产品信息列表
rpc productCategoryIndex(ProductCategoryIndexReq) returns (ProductCategoryIndexResp);
//获取产品信息详情
rpc productCategoryRead(WithIDChildren) returns (ProductCategory);
//获取产品品类下的物模型列表,绑定的物模型会自动添加到该产品品类及子分类的产品中,并不支持删除
rpc productCategorySchemaIndex(ProductCategorySchemaIndexReq) returns(ProductCategorySchemaIndexResp);
rpc productCategorySchemaMultiUpdate(ProductCategorySchemaMultiSaveReq) returns(Empty);
rpc productCategorySchemaMultiCreate(ProductCategorySchemaMultiSaveReq) returns(Empty);
rpc productCategorySchemaMultiDelete(ProductCategorySchemaMultiSaveReq) returns(Empty);
}
//设备消息
service DeviceMsg {
//获取设备sdk调试日志
rpc sdkLogIndex(SdkLogIndexReq) returns (SdkLogIndexResp);
//获取设备调试信息记录登入登出,操作
rpc hubLogIndex(HubLogIndexReq) returns (HubLogIndexResp);
rpc sendLogIndex(SendLogIndexReq) returns(SendLogIndexResp);
rpc statusLogIndex(StatusLogIndexReq) returns(StatusLogIndexResp);
rpc abnormalLogIndex(AbnormalLogIndexReq) returns(AbnormalLogIndexResp);
rpc abnormalLogCreate(AbnormalLogInfo) returns(Empty);
//获取设备数据信息
rpc propertyLogLatestIndex(PropertyLogLatestIndexReq) returns (PropertyLogIndexResp);
//获取设备数据信息
rpc propertyLogIndex(PropertyLogIndexReq) returns (PropertyLogIndexResp);
//获取设备数据信息
rpc eventLogIndex(EventLogIndexReq) returns (EventLogIndexResp);
//获取设备影子列表
rpc shadowIndex(PropertyLogLatestIndexReq) returns (ShadowIndexResp);
//获取网关可以绑定的子设备列表
rpc gatewayCanBindIndex(GatewayCanBindIndexReq)returns(GatewayCanBindIndexResp);
}
//设备交互
service DeviceInteract {
//调用设备行为
rpc actionSend(ActionSendReq) returns(ActionSendResp);
//获取异步调用设备行为的结果
rpc actionRead(RespReadReq) returns(ActionSendResp);
//回复调用设备行为
rpc actionResp(ActionRespReq) returns(Empty);
//请求设备获取设备最新属性
rpc propertyGetReportSend(PropertyGetReportSendReq) returns(PropertyGetReportSendResp);
//请求设备获取设备最新属性
rpc propertyGetReportMultiSend(PropertyGetReportMultiSendReq) returns(PropertyGetReportMultiSendResp);
//调用设备属性
rpc propertyControlSend(PropertyControlSendReq) returns(PropertyControlSendResp);
//批量调用设备属性
rpc propertyControlMultiSend(PropertyControlMultiSendReq) returns(PropertyControlMultiSendResp);
//获取异步调用设备属性的结果
rpc propertyControlRead(RespReadReq) returns(PropertyControlSendResp);
//发送消息给设备 -- 调试使用
rpc sendMsg(SendMsgReq) returns(SendMsgResp);
//获取网关拓扑关系
rpc gatewayGetFoundSend(GatewayGetFoundReq)returns(Empty);
//通知网关绑定子设备
rpc gatewayNotifyBindSend(GatewayNotifyBindSendReq)returns(Empty);
//提供给边缘端进行http访问
rpc edgeSend(EdgeSendReq)returns(EdgeSendResp);
}
//新的ota功能实现
service OtaManage{
//添加升级包
rpc otaFirmwareInfoCreate(OtaFirmwareInfoCreateReq) returns (WithID);
//修改升级包
rpc otaFirmwareInfoUpdate(OtaFirmwareInfoUpdateReq) returns (WithID);
//删除升级包
rpc otaFirmwareInfoDelete(WithID) returns(Empty);
//升级包列表
rpc otaFirmwareInfoIndex(OtaFirmwareInfoIndexReq) returns(OtaFirmwareInfoIndexResp);
//查询升级包
rpc otaFirmwareInfoRead(WithID) returns(OtaFirmwareInfo);
//创建静态升级批次
rpc otaFirmwareJobCreate(OtaFirmwareJobInfo) returns(WithID);
//获取升级包下的升级任务批次列表
rpc otaFirmwareJobIndex(OtaFirmwareJobIndexReq) returns(OtaFirmwareJobIndexResp);
//查询指定升级批次的详情
rpc otaFirmwareJobRead(WithID) returns(OtaFirmwareJobInfo);
//取消动态升级策略
rpc otaFirmwareJobUpdate(OtaFirmwareJobInfo) returns (Empty);
//查询指定升级批次下的设备升级作业列表
rpc otaFirmwareDeviceIndex(OtaFirmwareDeviceIndexReq) returns(OtaFirmwareDeviceIndexResp);
//取消指定批次下的设备升级作业
rpc otaFirmwareDeviceCancel(OtaFirmwareDeviceCancelReq) returns(Empty);
//重新升级指定批次下升级失败或升级取消的设备升级作业
rpc otaFirmwareDeviceRetry(OtaFirmwareDeviceRetryReq) returns(Empty);
//app确认设备升级
rpc otaFirmwareDeviceConfirm(OtaFirmwareDeviceConfirmReq) returns(Empty);
rpc otaModuleInfoCreate(OtaModuleInfo)returns(WithID);
rpc otaModuleInfoUpdate(OtaModuleInfo)returns(Empty);
rpc otaModuleInfoDelete(WithID)returns(Empty);
rpc otaModuleInfoIndex(OtaModuleInfoIndexReq)returns(OtaModuleInfoIndexResp);
rpc otaModuleInfoRead(WithIDCode)returns(OtaModuleInfo);
}
deviceMsg内置结构体
示例参考: things/service/dmsvr/internal/startup/script.go:189 物模型上报解析使用示例:
import "log"
import "context"
import "deviceMsg"
import "json"
/*
PublishMsg struct { //发布消息结构体定义
Handle string `json:"handle"` //对应 mqtt topic的第一个 thing ota config 等等
Type string `json:"type"` //操作类型 从topic中提取 物模型下就是 property属性 event事件 action行为
Payload []byte `json:"payload"` //对应mqtt的payload
Timestamp int64 `json:"timestamp"` //毫秒时间戳
ProductID string `json:"productID"` //发送设备的产品ID
DeviceName string `json:"deviceName"` //发送设备的设备ID
Explain string `json:"explain"` //内部使用的拓展字段
ProtocolCode string `json:"protocolCode"` //协议网关code
}
*/
func Handle(ctx context.Context,req *deviceMsg.PublishMsg) *deviceMsg.PublishMsg{
log.Print(req)
var tReq deviceMsg.ThingReq
err:=json.Unmarshal(req.Payload,&tReq)
log.Printf("req:%3v,err:%v",tReq,err)
return req
}
入参示例:
{
"method":"report",
"msgToken":"123",
"timestamp":1677762028638,
"params":{
"power_switch":1,
"color":1,
"brightness":32
}
}
其他内置函数
包名 | 函数定义 | 参考说明 |
---|---|---|
dm | func ProductGet(ctx context.Context, productID string) (*dm.ProductInfo, error) | 获取产品信息 |
dm | func DeviceGet(ctx context.Context, productID string, deviceName string) (*dm.DeviceInfo, error) | 获取设备信息 |
dm | func SchemaGet(ctx context.Context, productID string, deviceName string) (*schema.Model, error) | 获取设备物模型信息,参考: things/share/domain/schema/model.go |
log | func PrintLn(v ...any) | 页面调试的时候会把日志打印出来 |
log | func Print(v ...any) | 页面调试的时候会把日志打印出来 |
log | func Printffunc(format string, v ...any) | 页面调试的时候会把日志打印出来 |
gjson | func Get(json, path string) Result | https://github.com/tidwall/gjson |
gjson | func GetMany(json string, path ...string) []Result | https://github.com/tidwall/gjson |
gjson | func GetManyBytes(json []byte, path ...string) []Result | https://github.com/tidwall/gjson |
gjson | func GetBytes(json []byte, path string) Result | https://github.com/tidwall/gjson |
gjson | func Parse(json string) Result | https://github.com/tidwall/gjson |
gjson | ParseBytes(json []byte) Result | https://github.com/tidwall/gjson |
gjson | func Set(json, path string, value interface{}) (string, error) | https://github.com/tidwall/sjson |
gjson | func SetBytes(json []byte, path string, value interface{}) ([]byte, error) | https://github.com/tidwall/sjson |
gjson | func Delete(json, path string) (string, error) | https://github.com/tidwall/sjson |
gjson | func DeleteBytes(json []byte, path string) ([]byte, error) | https://github.com/tidwall/sjson |
gjson | func SetRaw(json, path, value string) (string, error) | https://github.com/tidwall/sjson |
gjson | func DeleteBytes(json []byte, path string) ([]byte, error) | https://github.com/tidwall/sjson |
gjson | func SetRawBytes(json []byte, path string, value []byte) ([]byte, error) | https://github.com/tidwall/sjson |
schema | ModelSimple | 物模型定义(简易版本,网关子设备物模型使用): things/share/domain/schema/modelSimple.go |
schema | Model | 完整物模型定义: things/share/domain/schema/model.go |
gjson Result定义
// Result represents a json value that is returned from Get().
type Result struct {
// Type is the json type
Type Type
// Raw is the raw json
Raw string
// Str is the json string
Str string
// Num is the json number
Num float64
// Index of raw value in original json, zero means index unknown
Index int
// Indexes of all the elements that match on a path containing the '#'
// query character.
Indexes []int
}