N_JetlinksPro2_0 源码分析

插件

插件开发文档

概述

协议包和插件的区别

相同点: 都可以为平台提供动态的外挂程序能力

不同点: 协议包专注于提供设备数据编解码,且只服务于设备接入网关功能。 插件可通过SDK、HTTP、WebSocket等方式采集第三方平台数据。提供给设备接入网关或其他需要数据源的功能。插件中编写了大量为采集数据而存在的接口,如适配集群的定时任务等。

它是同’网关组件’绑定的, 每创建一个 插件网关组件, 就启动一个实例; (使用上可能接近与’网络组件’)

关键对象

老几样 org.jetlinks.plugin.core.Plugin // 插件实例接口 (最底层的) org.jetlinks.pro.plugin.impl.jar.PluginDriverInstallerProvider //插件启动安装器提供商,用于支持不同的插件类型,如: jar等; org.jetlinks.pro.plugin.impl.PluginDriverManager //插件管理对象, 管理所有插件 //org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager 负责管理所有插件的对象

自定义插件 org.jetlinks.plugin.internal.device.DeviceGatewayPlugin //自定义插件开发: 网关类型的插件封装, 继承自它来开发网关插件逻辑 org.jetlinks.plugin.internal.device.DeviceGatewayPluginDriver //自定义插件开发: 提供插件配置/描述, 和创建

数据存储

  1. 每一个插件组件(PluginDriver) 对应有一个组件提供器对应 (PluginDriverInstallerProvider)
  2. 最终统一由 PluginDriverManager 管理; 默认实现是org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager
  3. 插件数据存储在 s_plugin_driver 表; 因为一个插件可以有多个实例, 是一对多的关系, 还有一张是ID映射表 s_plugin_id_mapping; 管理接口是: /plugin/driver

官方后面出的文档

加载流程

插件加载流程图 在平台插件管理新增插件时,通过jar包的方式加载驱动,使用 ServiceLoader 来加载插件驱动,通过配置 PluginDriver 指定插件驱动实现类,如果未配置,则将扫描整个jar包中的 PluginDriver 的实现类并初始化。

插件运行时同步

本地增删改的同步

org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager: 继承的 org.jetlinks.pro.commons.ClusterEntityEventHandler有:

@EventListener
public final void handleEvent(EntityCreatedEvent<E> event) 
@EventListener
public final void handleEvent(EntitySavedEvent<E> event) 
@EventListener
public final void handleEvent(EntityDeletedEvent<E> event)
@EventListener
public final void handleEvent(EntityModifyEvent<E> event)

这些方法使用 Spring EventListener 体系, 监听本地的, Entity*Event 事件; 再最后调用到 org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager::loadDriver流程.

集群的其他节点插件的同步

org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager: 继承的 org.jetlinks.pro.commons.ClusterEntityEventHandler 订阅了集群 PluginDriverEntity 实体的增删改;

protected ClusterEntityEventHandler(EventBus eventBus, Class<E> type) {
        this.eventBus = eventBus;
        this.type = type;
 
        this.eventBus
            .subscribe(Subscription
                           .builder()
                           .subscriberId(StringUtils.hasText(this.getClass().getSimpleName()) ?
                                             this.getClass().getSimpleName() :
                                             this.getClass().getName())
                           //只订阅集群其他节点的数据,因为本地已经直接处理过了
                           .broker()
                           .topics(createTopic("*"))
                           .build(),
                       payload -> Mono
                           .defer(() -> {
                               String operation = payload.getTopicVars(createTopic("{operation}")).get("operation");
                               switch (operation) {
                                   case OPERATION_CREATED:
                                       return doHandleCreated(decode(payload), true);
                                   case OPERATION_SAVED:
                                       return doHandleSaved(decode(payload), true);
                                   case OPERATION_DELETED:
                                       return doHandleDeleted(decode(payload), true);
                                   case OPERATION_MODIFIED:
                                       ModifyEvent<E> event = decode(payload);
                                       return doHandleModified(event.before, event.after, true);
                               }
                               return Mono.empty();
                           })
                           .as(MonoTracer.create(payload.getTopic()))
            );
    }

然后同步, (注意: 其他节点的插件才会在这里同步 ().broker() //只订阅集群其他节点的数据) 最后调用到 org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager::loadDriver

本地启动时初始化插件的同步

org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager: 实现了 org.springframework.boot.CommandLineRunner, 在容器启动是 调用org.jetlinks.pro.plugin.PluginDriverInstaller::install初始化所有插件; 最后调用到 org.jetlinks.pro.plugin.impl.DefaultPluginDriverManager::loadDriver

DeviceGatewayPlugin 插件类 可以做什么?

主要逻辑在 org.jetlinks.plugin.internal.device.DeviceGatewayPlugin 和 org.jetlinks.plugin.core.AbstractPlugin

org.jetlinks.pro.plugin.device.PluginDeviceGateway 看源码, 差不多就是网关实现的翻版, 基本就是装饰了一下..

所有方法行为

可以参考 org.jetlinks.plugin.internal.device.DeviceGatewayPlugin 的注释说明

public abstract class DeviceGatewayPlugin extends AbstractPlugin {
    private final PluginDeviceGatewayService gatewayService;
    private final PluginDeviceManager deviceManager;
    protected final DeviceRegistry registry;
 
    public DeviceGatewayPlugin(String id, PluginContext context) {
        super(id, context);
        // 插件的一些平台对象   
        gatewayService = context
                .services()
                .getServiceNow(PluginDeviceGatewayService.class);
 
        registry = context
                .services()
                .getServiceNow(DeviceRegistry.class);
 
        deviceManager = context
                .services()
                .getServiceNow(PluginDeviceManager.class);
 
    }
    ////////////////////////////////////////////////提供给插件///////////////////////////////////////////////////////////
    /**
     * <!> **设备上报消息时调用, 将消息传递给平台**
     * 处理解析后的设备消息,用于将第三方系统或者sdk的消息转为平台统一消息格式后,将消息传递给平台.
     * 请使用平台内置的消息实现,请勿自己实现{@link DeviceMessage}接口. 
     */
    protected final Mono<Void> handleMessage(DeviceMessage message) {
        return gatewayService.handleMessage(this, message);
    }
    /**
     * 获取平台中的设备列表  **获取绑定该网关插件的所有设备**
     */
    protected final Flux<DeviceOperator> getPlatformDevices() {
        return deviceManager.getDevices(this);
    }
 
    /**
     * 获取平台中指定产品ID(插件中定义的产品ID)的设备列表 **根据产品获取所有设备列表**
     */
    protected final Flux<DeviceOperator> getPlatformDevices(String productId) {
        return deviceManager.getDevices(this, productId);
    }
 
    /////////////////////////////////////////////回调//////////////////////////////////////////////////////////////
    public Publisher<? extends DeviceMessage> execute(DeviceMessage message) {
        return Flux.empty();
    }
    /**
     * 执行物模型指令,并返回执行结果. **调用设备指令时调用**
     */
    public Publisher<? extends DeviceMessage> execute(String productId,
                                                      DeviceOperator device,
                                                      DeviceMessage message) {
        return execute(message);
    }
    
 
    /**
     * 获取产品配置的元数据信息,在产品详情界面会根据此信息来展示拓展信息输入框.  **产品的配置元数据**
     */
    public Mono<ConfigMetadata> getProductConfigMetadata(String productId) {
        return Mono.empty();
    }
 
    /**
     * 获取设备配置的元数据信息,在设备详情界面会根据此信息来展示拓展信息输入框. **设备的配置元数据**
     */
    public Mono<ConfigMetadata> getDeviceConfigMetadata(String productId) {
        return Mono.empty();
    }
 
 
 
    /**
     * 获取设备的状态,返回empty表示不支持获取. **获取设备 在线/离线状态**
     */
    public Mono<Byte> getDeviceState(DeviceOperator device) {
        return Mono.empty();
    }
 
    /**
     * 扫描某个产品下的设备 **获取某个产品下的所有设备**
     */
    @Deprecated
    public Flux<Device> scanDevices(String productId) {
        return Flux.empty();
    }
 
 
    /**
     * 当平台的设备注册(保存)时调用  **设备注册后调用**
     */
    public Mono<Void> doOnDeviceRegister(DeviceOperator device) {
        return Mono.empty();
    }
 
    /**
     * 当平台的产品注册(保存)时调用 **产品注册后调用**
     */
    public Mono<Void> doOnProductRegister(DeviceProductOperator product) {
        return Mono.empty();
    }
 
    //**插件类型, 目前只有一个**
    @Override
    public final PluginType getType() {
        return InternalPluginType.deviceGateway;
    }
}

实例 在网关启动时..

该加载行为适用于定时执行动作,如检测HTTP设备状态、定时获取设备数据等。

@Slf4j
class HttpApiDevicePlugin extends DeviceGatewayPlugin {
	@Override
    protected Mono<Void> doStart() {
 
        //启动时定时拉取状态
        this.context()
            .scheduler()
            .interval("pull_device_state",
                      Mono.defer(this::pollState),
                      Duration.ofSeconds(10));
 
        return super.doStart();
    }
 
    private Mono<Void> pollState() {
        return getPlatformDevices()
                .map(Thing::getId)
                .buffer(100)
                .flatMap(list -> this
                        .getDeviceState(list)
                        .flatMap(this::handleState)
                        .onErrorResume(err -> {
                            log.warn("check device state error", err);
                            return Mono.empty();
                        }))
                .then();
    }
}

实例 在调用设备功能 执行..

页面调用读取、写入、设备指令时。

@Slf4j
class HttpApiDevicePlugin extends DeviceGatewayPlugin {
	@Override
    public Publisher<? extends DeviceMessage> execute(DeviceMessage message) {
 
        // 读取属性
        if (message instanceof ReadPropertyMessage) {
            return registry
                    .getDevice(message.getDeviceId())
                    .flatMap(this::getDeviceInfo)
                    .map(DeviceInfo::getProperties)
                    .map(((ReadPropertyMessage) message).newReply()::success);
        }
        //修改属性
        else if (message instanceof WritePropertyMessage) {
            Map<String, Object> props = ((WritePropertyMessage) message).getProperties();
            Map<String, Object> props = ((WritePropertyMessage) message).getProperties();
            //WebClient进行POST请求
            return client
                    .post()
                    //URI路径
                    .uri("/device/{deviceId}/properties", message.getDeviceId())
                    //请求体
                    .bodyValue(props)
                    //发送请求并获取响应
                    .retrieve()
                    //将响应转换为Void类型
            		//实际应用场景中应将响应结果转换成一个具体的Object类型
                    //同时处理这个返回类型的对象并封装DeviceMessage
                    .bodyToMono(Void.class)
                    //返回一个成功的回复
                    .thenReturn(((WritePropertyMessage) message).newReply().success(props))
                    //出现错误,则返回带有错误信息的回复
                    .onErrorResume(err -> Mono.just(((WritePropertyMessage) message)
                                                            .newReply()
                                                            .error(err)));
            // 调用功能
        } else if (message instanceof FunctionInvokeMessage) {
            FunctionInvokeMessage functionInvokeMessage = ((FunctionInvokeMessage) message);
            return this
                    .invokeFunction(functionInvokeMessage)
                    .map(result -> functionInvokeMessage.newReply().success(result));
        }
 
        return super.execute(message);
    }
}

实例 插件上传并被加载后时..执行..

该加载行为适用于需要执行一次初始化数据的操作,如创建插件产品后自动获取第三方设备数据。

@Slf4j
class HttpApiDevicePlugin extends DeviceGatewayPlugin {
 
     public HttpApiDevicePlugin(String id, PluginContext context) {
         ...
         registerHandler(QueryDevicePageCommand.class,
                        CommandHandler
                                .of(QueryDevicePageCommand.metadata(
                                            SimplePropertyMetadata.of("id", "ID", StringType.GLOBAL)),
                                    (cmd, self) -> queryDevice(cmd),
                                    QueryDevicePageCommand::new)
         ...
        );
     }
}
 

自定义插件实现

org.jetlinks.plugin.internal.device.DeviceGatewayPluginDriver 用于注册,创建插件 (类似自定义协议的 ProtocolSupport) org.jetlinks.plugin.internal.device.DeviceGatewayPlugin 实现插件逻辑主接口 (网关类型)

依赖 pom.xml

<!-- jetlinks  parent (基本配置)-->
<parent>
	<groupId>org.jetlinks.plugin</groupId>
	<artifactId>jetlinks-plugin</artifactId>
	<version>1.0.0-SNAPSHOT</version>
</parent>
 
<!-- jetlinks plugin 相关api; 注意: cn.hutool 是通用依赖(或者说jetlinks-standalone要引入)-->
<dependencies>
	<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-http</artifactId> <version>5.8.4</version> </dependency>
	<dependency>
		<groupId>org.jetlinks.plugin</groupId>
		<artifactId>plugin-internal</artifactId>
		<version>1.0.0-SNAPSHOT</version>
	</dependency>
</dependencies>

实现插件逻辑 DeviceGatewayPlugin

public class HttpActiveDevicePlugin extends DeviceGatewayPlugin {
    private static final Logger log = LoggerFactory.getLogger(HttpActiveDevicePlugin.class);
 
    static final String KEY_BASE_API_URL = "base_api";
    static final String KEY_TEST = "test";
 
    public final String BASE_API;
    public HttpActiveDevicePlugin(String id, PluginContext context) {
        super(id, context);
        //插件的配置 ....
        BASE_API  = context
                .environment()
                .getProperty(KEY_BASE_API_URL)
                .orElseThrow(() -> new IllegalArgumentException("plugin env 'base_api' is required"));
 
        log.warn(" api 地址是 : {}", KEY_BASE_API_URL);
 
    }
 
    @Override
    protected Mono<Void> doStart() {
        //插件启动时,  定时拉取状态; //这里是开一个 scheduler 
        this.context()
                .scheduler()
                .interval("HttpActiveDevicePlugin_pollState",
                        Mono.defer(this::pollDeviceState),
                        Duration.ofSeconds(10));
 
        return super.doStart();
    }
    private Mono<Void> pollDeviceState() {
        return getPlatformDevices()//拿到插件绑定 的所有设备ID
                .map(DeviceOperator::getDeviceId)
                .buffer(100)
                .flatMap(list -> this
                        .getDeviceState(list)//去第三方获取数据
                        .flatMap(this::handleState)//处理数据
                        .onErrorResume(err -> {
                            log.warn("check device state error", err);
                            return Mono.empty();
                        }))
                .then();
    }
    private Flux<JsonNode> getDeviceState(List<String> device) {
        // 这里 HTTP 或 RPC, 总之 去第三方拿
        return Flux.empty();
    }
    private Mono<Void>  handleState(JsonNode message) {
        //伪代码, 根据拿到的数据 判断
        if (deviceInfo.online) {
            //属性上报 消息
            ReportPropertyMessage msg = new ReportPropertyMessage();
            msg.setDeviceId(deviceInfo.id);
            msg.setProperties(deviceInfo.getProperties());
 
            return handleMessage(message);//提交给平台处理
        } else {
            //离线消息
            DeviceOfflineMessage msg = new DeviceOfflineMessage();
            msg.setDeviceId(deviceInfo.id);
            return handleMessage(message);//提交给平台处理
        }
    }
 
}

给定描述和创建对象 DeviceGatewayPluginDriver

public class HttpActiveDevicePluginDriver implements DeviceGatewayPluginDriver {
    
    private static final Logger log = LoggerFactory.getLogger(HttpActiveDevicePluginDriver.class);
    static Version version_1_0 = new Version(1, 0, 0, true);
    public static final ConfigKey<Boolean> required = ConfigKey.of("required", "是否必填", Boolean.TYPE);
 
    @Nonnull
    @Override
    public Mono<? extends DeviceGatewayPlugin> createPlugin(@Nonnull String pluginId, @Nonnull PluginContext context) {
        //创建
        return Mono.just(new HttpActiveDevicePlugin(pluginId, context));
    }
 
    @Nonnull
    @Override
    public Description getDescription() {
        // 插件 的配置/描述
        return Description.of(
                "mock-http-api",
                "模拟HTTP-API接入",
                "",
                version_1_0,
                VersionRange.of(Version.platform_2_0, Version.platform_latest),
                //告诉平台,此插件需要的配置信息
                Collections.singletonMap(
                        PLUGIN_CONFIG_METADATA,
                        new DefaultConfigMetadata()
                            .add(HttpActiveDevicePlugin.KEY_BASE_API_URL, "API地址", new StringType().expand(required.value(true)))
                            .add(HttpActiveDevicePlugin.KEY_TEST, "测试测试", new StringType().expand(required.value(true)))
                )
        );
    }
}

设备ID映射

需要在插件中, 实现查询方法, 方可在UI中找到映射

public class HttpActiveDevicePlugin extends DeviceGatewayPlugin implements CommandSupport {  
    private static final Logger log = LoggerFactory.getLogger(HttpActiveDevicePlugin.class);
    @Override
    protected <R> R executeUndefinedCommand(@Nonnull Command<R> command) {
        if (command instanceof QueryDevicePageCommand){
            /**
             *  这里可以扩展 查外部数据, 比如数据库 等
             *  TODO 目前对接的不多 暂时这里写死
             *  id 格式是 {eui}_{type}
             */
            List<Device> devices = new ArrayList<>();
            Device device = new Device();
            device.setId("0078cc7371b2000007801415_gateway-soil");
            device.setProductId("");
            device.setName("土壤墒情01");
            devices.add(device);
 
            device = new Device();
            device.setId("00781d3b6c70000007c01413_weather");
            device.setProductId("");
            device.setName("晓万象Q1叶子气象站");
            devices.add(device);
 
            device = new Device();
            device.setId("n50902054_insecticidal-lamp");
            device.setProductId("");
            device.setName("杀虫灯01");
            devices.add(device);
 
            PagerResult<Device> result = PagerResult.of(2, devices) ;
            return (R) result;
        }
        throw new CommandException(this, command, "error.unsupported_execute_command", null, CommandUtils.getCommandIdByType(command.getClass()));
    }
    ...............
}

配置化协议解析

二进制

字段说明示例
name字段名temperature
type数据类型uint8/int16/float32/string/bit
offset字节偏移(从 0 开始)0
length字节长度1/2/4
endian字节序little(小端)/big(大端)
desc字段描述温度值
unit单位
scale缩放系数0.1(原值 ×0.1 = 真实值)

位域

子字段名说明
bits位域子项数组,定义每个 bit / 位段的解析规则
name子位段名称
pos起始 bit 位置(从 0 开始,最低位 = 0,最高位 = 7)
len占用 bit 长度(默认 = 1,可省略)
desc子位段描述

位域 = 把 1 个字节(8 bit)拆分成多个独立的 bit 位 / 连续位段进行解析

  • 最小解析单位:bit(比特)
  • 适用场景:设备状态、开关量、报警标志、模式、档位等用少量 bit 表示的数据
  • 固定占用:1 字节(8 bit),不支持跨字节解析(跨字节需拆成多个位域字段)

示例配置

{
  "protocol_name": "温湿度传感器协议",
  "total_length": 8,    // 整包长度
  "fields": [
    {
      "name": "header",
      "type": "uint8",
      "offset": 0,
      "length": 1,
      "endian": "big",
      "desc": "帧头",
      "fixed_value": 0xAA  // 固定值校验
    },
    {
      "name": "temperature",
      "type": "int16",
      "offset": 1,
      "length": 2,
      "endian": "big",
      "desc": "温度",
      "unit": "℃",
      "scale": 0.1
    },
    {
      "name": "humidity",
      "type": "uint16",
      "offset": 3,
      "length": 2,
      "endian": "big",
      "desc": "湿度",
      "unit": "%RH",
      "scale": 0.1
    },
    {
      "name": "status",
      "type": "bit",       // 位域解析
      "offset": 5,
      "length": 1,
      "bits": [
        {"name": "power_alarm", "pos": 0, "desc": "电源报警"},
        {"name": "sensor_error", "pos": 1, "desc": "传感器故障"},
        {"name": "signal_strength", "pos": 2, "len": 2, "desc": "信号强度(2位)"}
      ]
    },
    {
      "name": "checksum",
      "type": "uint8",
      "offset": 6,
      "length": 1,
      "desc": "校验和"
    },
    {
      "name": "tail",
      "type": "uint8",
      "offset": 7,
      "length": 1,
      "fixed_value": 0x55,
      "desc": "帧尾"
    }
  ]
}