透传协议的(DirectDeviceMessage)消息 处理流程
以MQTT客户端为例 官方后面出的文档
protected Disposable doSubscribe(String topic, int qos) {
return mqttClient
.subscribe(Collections.singletonList(topic), qos)
.filter(msg -> isStarted())
.flatMap(mqttMessage -> codecMono
.flatMapMany(codec -> codec//这里的编码器不是协议 ProtocolSupportProvider 提供的, 而是 `org.jetlinks.core.message.codec.TraceDeviceMessageCodec` 内部又包了一层/ 这里不是重点
.decode(FromDeviceMessageContext.of(
new UnknownDeviceMqttClientSession(getId(), mqttClient, monitor),
mqttMessage,
registry)))
.flatMap(message -> {//先调用 ProtocolSupportProvider 提供的解码器(解码出来要设置设备ID啊, 不然不知道调用哪个脚本); 出来的消息实例是 DirectDeviceMessage 按正常消息处理
monitor.receivedMessage();
return helper
.handleDeviceMessage((DeviceMessage) message,
device -> createDeviceSession(device, mqttClient),
ignore -> {
},
() -> log.warn("can not get device info from message:{},{}", mqttMessage.print(), message)
);
})
.subscribeOn(Schedulers.parallel())
.onErrorResume((err) -> {
log.error("handle mqtt client message error:{}", mqttMessage, err);
return Mono.empty();
}), Integer.MAX_VALUE)
.contextWrite(ReactiveLogger.start("gatewayId", getId()))
.subscribe();
}解编码器
@Override
public Publisher<Message> decode(MessageDecodeContext context, EncodedMessage encodedMessage) {
MqttMessage message = ((MqttMessage) context.getMessage());
// if (context.getDevice() == null) {
// 如果拿不到设备op 即是第一次连接消息
// }
final String topic = message.getTopic();
byte[] payload = message.payloadAsBytes();
if (payload.length > 2) {
if (payload[0] == '0' && payload[1] == 'x') {
try {
payload = Hex.decodeHex(new String(payload, 2, payload.length-2));
} catch (DecoderException e) {
payload = message.payloadAsBytes();
}
}
}
DirectDeviceMessage msg = new DirectDeviceMessage();
final String deviceId = getDeviceIdByTopic(topic);
msg.setDeviceId(deviceId);
msg.setPayload(payload);
msg.addHeader("hex", Hex.encodeHexString(payload));
msg.addHeader("topic", message.getTopic());
return Mono.just(msg);
}DirectDeviceMessage 的解析转换
1. 透传类型的消息, 上行是按照正常消息逻辑处理, 发布 DirectDeviceMessage 到事件总线;
2. TransparentDeviceMessageConnector 从事件总线订阅了 DirectDeviceMessage 类型消息(所以要提供设备ID), 它再进行解码转换, 再发布到事件总线<!>
org.jetlinks.pro.device.message.transparent.TransparentDeviceMessageConnector#handleMessage
@Subscribe("/device/*/*/message/direct")//只订阅 DirectDeviceMessage
public Mono<Void> handleMessage(DirectDeviceMessage message) {
String productId = message.getHeaderOrDefault(Headers.productId);//产品ID, 其他地方根据设备ID补充了?
String deviceId = message.getDeviceId();//设备ID
//见下: codec 实例是 org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec
TransparentMessageCodec codec = getCodecOrNull(productId, deviceId);//根据产品Id找
if (null == codec) {//没有设备ID情况, 就无了 ..
return Mono.empty();
}
return codec
.decode(message)//解析
.flatMap(msg -> messageHandler.handleMessage(null, msg))//消息处理, 见下: 发布 DeviceMessage 到事件总线
.then();
}org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec
@Override
public Flux<DeviceMessage> decode(DirectDeviceMessage message) {
return decodeCircuitBreaker
// 这里就是 org.jetlinks.pro.device.message.transparent.script.Jsr223TransparentMessageCodecProvider 相关的了
// 提供给脚本上下文的是 org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.DecodeContext
.execute(() -> codec.decode(new DecodeContext(message)))
.flatMapMany(this::convert)//这结果是脚本解析出来的结果, 见下: 转换脚本结果
.doOnNext(msg -> {
String from = message.getMessageId();
if (from == null) {
from = message.getHeader(PropertyConstants.uid).orElse(null);
}
if (from != null) {
msg.addHeader("decodeFrom", from);
}
// 这根据 DirectDeviceMessage 设置回设备ID, 所有支持透传的编解码协议, 要设置设备ID
msg.thingId(message.getThingType(), message.getThingId());
});
}脚本解码 解析出来的结果
org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec
protected Flux<DeviceMessage> convert(Object msg) {
if (msg == null) {
return Flux.empty();
}
if (msg instanceof DeviceMessage) {
return Flux.just(((DeviceMessage) msg));
}
if (msg instanceof Map) {
if (MapUtils.isEmpty(((Map) msg))) {
return Flux.empty();
}
MessageType type = MessageType.of(((Map<String, Object>) msg)).orElse(MessageType.UNKNOWN);
if (type == MessageType.UNKNOWN) {
//返回map , 无类型, 则转为属性上报
return Flux.just(new ReportPropertyMessage().properties(((Map) msg)));
}
return Mono
.justOrEmpty(type.convert(((Map) msg)))
.flux()
.cast(DeviceMessage.class);
}
if (msg instanceof Collection) {
return Flux
.fromIterable(((Collection<?>) msg))
.flatMap(this::convert);
}
if (msg instanceof Publisher) {
return Flux
.from(((Publisher<?>) msg))
.flatMap(this::convert);
}
return Flux.error(new UnsupportedOperationException("unsupported data:" + msg));
}再发布到事件总线
这里是正常的处理消息流程了, 略… [参加见上: Pro 版 <消息上行> 处理流程/子设备等消息处理]
org.jetlinks.pro.device.message.DeviceMessageConnector
…
关于脚本编解码器 Jsr223TransparentMessageCodecProvider
顾名思义脚本解析编解码器的提供者;
注意它预先绑定编译的脚本; 所以调试创建出来的 SimpleTransparentMessageCodec (意义不大, 里面只是集群代理处理); 重点在创建:
org.jetlinks.pro.device.message.transparent.script.Jsr223TransparentMessageCodecProvider#createCodec 中
@Override
public Mono<TransparentMessageCodec> createCodec(Map<String, Object> configuration) {
String lang = (String) configuration.getOrDefault("lang", "js");
String script = (String) configuration.get("script");
Assert.hasText(lang, "lang can not be null");
Assert.hasText(script, "script can not be null");
CodecContext context = new CodecContext();
// <!> 这里脚本 绑定到了 org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.Codec 接口
SimpleTransparentMessageCodec.Codec codec = Scripts
.getFactory(lang)
.bind(Script.of("jsr223-transparent", script),
SimpleTransparentMessageCodec.Codec.class,
ExecutionContext.create(Collections.singletonMap("codec", context)));
if (context.encoder == null && codec != null) {
context.onDownstream(codec::encode);
}
if (context.decoder == null && codec != null) {
context.onUpstream(codec::decode);
}
if (codec == null && context.encoder == null && context.decoder == null) {
return Mono.error(new ValidationException("script", "error.codec_message_undefined"));
}
return Mono
.deferContextual(ctx -> Mono
.just(
new SimpleTransparentMessageCodec(OperationSource.fromContext(ctx).orElse(null), context)
));
}脚本可访问的内容
引擎添加的属性
初始化脚本引擎的工厂类 org.jetlinks.pro.script.jsr223.Jsr223ScriptFactory 添加了一些 java 对象
org.jetlinks.pro.script.jsr223.Jsr223ScriptFactory#compile(org.jetlinks.pro.script.Script, boolean)
private CompiledScript compile(Script script, boolean convert) {
......
ctx.setAttribute("_$console", logger, ScriptContext.ENGINE_SCOPE);// 是 Slf4jScriptLogger
ctx.setAttribute("_$utils", getUtils(), ScriptContext.ENGINE_SCOPE);// 一些utils函数() org.jetlinks.pro.script.AbstractScriptFactory.Utils
ctx.setAttribute("engine", null, ScriptContext.ENGINE_SCOPE);//清空 engine 属性
javax.script.CompiledScript compiledScript = compile0(script);
return (context) -> Jsr223ScriptFactory.this
.eval(compiledScript,
script,
ExecutionContext.compose(ctx, context),
convert);
}解码脚本
调用函数给了一个 DecodeContext org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.DecodeContext
// 是 Netty的 io.netty.buffer.ByteBuf; 可参考 DirectDeviceMessage
var buffer = context.payload();
// 转为json
// var json = context.json();
//mqtt 时通过此方法获取topic
// var topic = context.topic();
//原始 DirectDeviceMessage
// context.message();
// 提取设备Id
// var deviceId = context.pathVars("/{deviceId}/**",topic)
//温度属性 (二进制)
// var temperature = buffer.getShort(3) * 10;
//湿度属性 (二进制)
// var humidity = buffer.getShort(6) * 10;指定消息类型
//解码函数
function decode(context) {
/**
* 设备上报消息
*/
var reportMessage = {
"messageType": "REPORT_PROPERTY",
"deviceId": "设备ID",
"properties": {
"属性ID": "属性值"
}
}
/**
* 读取属性回复消息
*/
var readMessageReply = {
"messageType": "READ_PROPERTY_REPLY",
"deviceId": "设备ID",
"properties": {
"属性ID": "属性值"
}
}
/**
* 修改属性回复消息
*/
var writeMessageReply = {
"messageType": "WRITE_PROPERTY_REPLY",
"deviceId": "设备ID",
"properties": {
"属性ID": "属性值"
}
}
/**
* 功能调用回复消息
*/
var invokeMessageReply = {
"messageType": "INVOKE_FUNCTION_REPLY",
"deviceId": "设备ID",
"functionId": "功能Id",
"output": "功能返回结果对象"
}
/**
* 事件上报
*/
var invokeMessage = {
"messageType": "EVENT",
"event": "事件Id",
"data": "事件对象"
}
//return xxxMesage;
}编码
调用函数给了一个 EncodeContext
org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.EncodeContext
function encode(context) {\
//平台下发的指令
var message = context . message();
//mqtt topic
context. topic("/dtu/down");
//支持一些行为
//读取任意属性都下发此指令
context.whenReadProperty("*", () => "0x500300340009C983");
context.whenFunction("alarmopen", () => "0x7EFF0603000001FEF7EF");
}