透传协议的(DirectDeviceMessage)消息 处理流程

以MQTT客户端为例 官方后面出的文档

protected Disposable doSubscribe(String topic, int qos) {
        return mqttClient
            .subscribe(Collections.singletonList(topic), qos)
            .filter(msg -> isStarted())
            .flatMap(mqttMessage -> codecMono
                .flatMapMany(codec -> codec//这里的编码器不是协议 ProtocolSupportProvider 提供的, 而是 `org.jetlinks.core.message.codec.TraceDeviceMessageCodec` 内部又包了一层/ 这里不是重点
                    .decode(FromDeviceMessageContext.of(
                        new UnknownDeviceMqttClientSession(getId(), mqttClient, monitor),
                        mqttMessage,
                        registry)))
                .flatMap(message -> {//先调用 ProtocolSupportProvider 提供的解码器(解码出来要设置设备ID啊, 不然不知道调用哪个脚本); 出来的消息实例是 DirectDeviceMessage 按正常消息处理
                    monitor.receivedMessage();
                    return helper
                        .handleDeviceMessage((DeviceMessage) message,
                                             device -> createDeviceSession(device, mqttClient),
                                             ignore -> {
                                             },
                                             () -> log.warn("can not get device info from message:{},{}", mqttMessage.print(), message)
                        );
                })
                .subscribeOn(Schedulers.parallel())
                .onErrorResume((err) -> {
                    log.error("handle mqtt client message error:{}", mqttMessage, err);
                    return Mono.empty();
                }), Integer.MAX_VALUE)
            .contextWrite(ReactiveLogger.start("gatewayId", getId()))
            .subscribe();
    }

解编码器

	@Override
	public Publisher<Message> decode(MessageDecodeContext context, EncodedMessage encodedMessage) {
		MqttMessage message = ((MqttMessage) context.getMessage());
		//  if (context.getDevice() == null) {
			// 如果拿不到设备op 即是第一次连接消息
		//  }
		final String topic = message.getTopic();
		byte[] payload = message.payloadAsBytes();
		if (payload.length > 2) {
			if (payload[0] == '0' && payload[1] == 'x') {
				try {
					payload = Hex.decodeHex(new String(payload, 2, payload.length-2));
				} catch (DecoderException e) {
					payload = message.payloadAsBytes();
				}
			}
		}
		DirectDeviceMessage msg = new DirectDeviceMessage();
		final String deviceId = getDeviceIdByTopic(topic);
		msg.setDeviceId(deviceId);
		msg.setPayload(payload);
		msg.addHeader("hex", Hex.encodeHexString(payload));
		msg.addHeader("topic", message.getTopic());
		return Mono.just(msg);
	}

DirectDeviceMessage 的解析转换

1. 透传类型的消息, 上行是按照正常消息逻辑处理, 发布 DirectDeviceMessage 到事件总线; 2. TransparentDeviceMessageConnector 从事件总线订阅了 DirectDeviceMessage 类型消息(所以要提供设备ID), 它再进行解码转换, 再发布到事件总线<!> org.jetlinks.pro.device.message.transparent.TransparentDeviceMessageConnector#handleMessage

@Subscribe("/device/*/*/message/direct")//只订阅 DirectDeviceMessage
public Mono<Void> handleMessage(DirectDeviceMessage message) {
    String productId = message.getHeaderOrDefault(Headers.productId);//产品ID, 其他地方根据设备ID补充了?
    String deviceId = message.getDeviceId();//设备ID
    //见下: codec 实例是 org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec
    TransparentMessageCodec codec = getCodecOrNull(productId, deviceId);//根据产品Id找 
    if (null == codec) {//没有设备ID情况, 就无了 ..
        return Mono.empty();
    }
    return codec
        .decode(message)//解析
        .flatMap(msg -> messageHandler.handleMessage(null, msg))//消息处理, 见下: 发布 DeviceMessage 到事件总线
        .then();
}

org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec

 @Override
public Flux<DeviceMessage> decode(DirectDeviceMessage message) {
 
    return decodeCircuitBreaker
        // 这里就是 org.jetlinks.pro.device.message.transparent.script.Jsr223TransparentMessageCodecProvider 相关的了
        // 提供给脚本上下文的是 org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.DecodeContext
        .execute(() -> codec.decode(new DecodeContext(message)))
        .flatMapMany(this::convert)//这结果是脚本解析出来的结果, 见下: 转换脚本结果
        .doOnNext(msg -> {
            String from = message.getMessageId();
            if (from == null) {
                from = message.getHeader(PropertyConstants.uid).orElse(null);
            }
            if (from != null) {
                msg.addHeader("decodeFrom", from);
            }
            // 这根据 DirectDeviceMessage 设置回设备ID, 所有支持透传的编解码协议, 要设置设备ID
            msg.thingId(message.getThingType(), message.getThingId());
        });
}

脚本解码 解析出来的结果

org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec

protected Flux<DeviceMessage> convert(Object msg) {
    if (msg == null) {
        return Flux.empty();
    }
    if (msg instanceof DeviceMessage) {
        return Flux.just(((DeviceMessage) msg));
    }
    if (msg instanceof Map) {
        if (MapUtils.isEmpty(((Map) msg))) {
            return Flux.empty();
        }
        MessageType type = MessageType.of(((Map<String, Object>) msg)).orElse(MessageType.UNKNOWN);
        if (type == MessageType.UNKNOWN) {
            //返回map , 无类型, 则转为属性上报
            return Flux.just(new ReportPropertyMessage().properties(((Map) msg)));
        }
        return Mono
            .justOrEmpty(type.convert(((Map) msg)))
            .flux()
            .cast(DeviceMessage.class);
    }
    if (msg instanceof Collection) {
        return Flux
            .fromIterable(((Collection<?>) msg))
            .flatMap(this::convert);
    }
    if (msg instanceof Publisher) {
        return Flux
            .from(((Publisher<?>) msg))
            .flatMap(this::convert);
    }
    return Flux.error(new UnsupportedOperationException("unsupported data:" + msg));
}

再发布到事件总线

这里是正常的处理消息流程了, 略… [参加见上: Pro 版 <消息上行> 处理流程/子设备等消息处理] org.jetlinks.pro.device.message.DeviceMessageConnector

关于脚本编解码器 Jsr223TransparentMessageCodecProvider

顾名思义脚本解析编解码器的提供者; 注意它预先绑定编译的脚本; 所以调试创建出来的 SimpleTransparentMessageCodec (意义不大, 里面只是集群代理处理); 重点在创建: org.jetlinks.pro.device.message.transparent.script.Jsr223TransparentMessageCodecProvider#createCodec

@Override
public Mono<TransparentMessageCodec> createCodec(Map<String, Object> configuration) {
    String lang = (String) configuration.getOrDefault("lang", "js");
    String script = (String) configuration.get("script");
    Assert.hasText(lang, "lang can not be null");
    Assert.hasText(script, "script can not be null");
 
    CodecContext context = new CodecContext();
    // <!> 这里脚本 绑定到了 org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.Codec 接口
    SimpleTransparentMessageCodec.Codec codec = Scripts
        .getFactory(lang)
        .bind(Script.of("jsr223-transparent", script),
            SimpleTransparentMessageCodec.Codec.class,
            ExecutionContext.create(Collections.singletonMap("codec", context)));
 
    if (context.encoder == null && codec != null) {
        context.onDownstream(codec::encode);
    }
    if (context.decoder == null && codec != null) {
        context.onUpstream(codec::decode);
    }
 
    if (codec == null && context.encoder == null && context.decoder == null) {
        return Mono.error(new ValidationException("script", "error.codec_message_undefined"));
    }
    return Mono
        .deferContextual(ctx -> Mono
            .just(
                new SimpleTransparentMessageCodec(OperationSource.fromContext(ctx).orElse(null), context)
            ));
}

脚本可访问的内容

引擎添加的属性

初始化脚本引擎的工厂类 org.jetlinks.pro.script.jsr223.Jsr223ScriptFactory 添加了一些 java 对象 org.jetlinks.pro.script.jsr223.Jsr223ScriptFactory#compile(org.jetlinks.pro.script.Script, boolean)

private CompiledScript compile(Script script, boolean convert) {
    ......
    ctx.setAttribute("_$console", logger, ScriptContext.ENGINE_SCOPE);// 是 Slf4jScriptLogger
    ctx.setAttribute("_$utils", getUtils(), ScriptContext.ENGINE_SCOPE);// 一些utils函数() org.jetlinks.pro.script.AbstractScriptFactory.Utils
    ctx.setAttribute("engine", null, ScriptContext.ENGINE_SCOPE);//清空 engine 属性
 
    javax.script.CompiledScript compiledScript = compile0(script);
 
    return (context) -> Jsr223ScriptFactory.this
        .eval(compiledScript,
                script,
                ExecutionContext.compose(ctx, context),
                convert);
}

解码脚本

调用函数给了一个 DecodeContext org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.DecodeContext

// 是 Netty的 io.netty.buffer.ByteBuf; 可参考 DirectDeviceMessage
var buffer = context.payload();
 
// 转为json
// var json = context.json();
 
//mqtt 时通过此方法获取topic
// var topic = context.topic();
 
//原始 DirectDeviceMessage 
// context.message();
 
// 提取设备Id
// var deviceId = context.pathVars("/{deviceId}/**",topic)
 
//温度属性 (二进制)
// var temperature = buffer.getShort(3) * 10;
//湿度属性 (二进制)
// var humidity = buffer.getShort(6) * 10;

指定消息类型

//解码函数
function decode(context) {
    /**
     * 设备上报消息
     */
    var reportMessage =  {
        "messageType": "REPORT_PROPERTY",
        "deviceId": "设备ID",
        "properties": {
            "属性ID": "属性值"
        }
    }
    /**
     * 读取属性回复消息
     */
    var readMessageReply =  {
        "messageType": "READ_PROPERTY_REPLY",
        "deviceId": "设备ID",
        "properties": {
            "属性ID": "属性值"
        }
    }
    /**
     * 修改属性回复消息
     */
    var writeMessageReply =  {
        "messageType": "WRITE_PROPERTY_REPLY",
        "deviceId": "设备ID",
        "properties": {
            "属性ID": "属性值"
        }
    }
    /**
     * 功能调用回复消息
     */
    var invokeMessageReply =  {
        "messageType": "INVOKE_FUNCTION_REPLY",
        "deviceId": "设备ID",
        "functionId": "功能Id",
        "output": "功能返回结果对象"
    }
    /**
     * 事件上报
     */
    var invokeMessage =  {
        "messageType": "EVENT",
        "event": "事件Id",
        "data": "事件对象"
    }
    //return xxxMesage;
}

编码

调用函数给了一个 EncodeContext org.jetlinks.pro.device.message.transparent.SimpleTransparentMessageCodec.EncodeContext

function encode(context) {\
    //平台下发的指令
    var message = context . message();
 
    //mqtt topic
    context. topic("/dtu/down");
 
    //支持一些行为
 
    //读取任意属性都下发此指令
    context.whenReadProperty("*", () => "0x500300340009C983");
 
    context.whenFunction("alarmopen", () => "0x7EFF0603000001FEF7EF");
}