N_JetlinksPro.md

JetlinksPro

N_Jetlinks1_0 源码分析 N_Reactor 响应式框架编程 JetlinksPro 插件开发 JetlinksPro 透传源码分析 官方协议包Git

网络组件

网络组件的重载 流程

org.jetlinks.pro.network.manager.web.NetworkConfigController#start org.jetlinks.pro.network.manager.service.NetworkConfigService#start

ClusterNetworkManager

org.jetlinks.pro.network.ClusterNetworkManager#reload org.jetlinks.pro.network.ClusterNetworkManager#doReload

private Mono<Void> doReload(String type, String id) {
    return handleConfig(NetworkType.of(type), id, this::createOrUpdate)// 注意 这个`this::createOrUpdate` Lambda , 下面回调的
        .then();

org.jetlinks.pro.network.ClusterNetworkManager#handleConfig

private Mono<Network> handleConfig(NetworkType type,
                                       String id,
                                       Function3<NetworkProvider<Object>, String, Object, Mono<Network>> handler) {
        @SuppressWarnings("all")
        NetworkProvider<Object> networkProvider = (NetworkProvider) this
            .getProvider(type.getId())
            .orElseThrow(() -> new UnsupportedOperationException("不支持的类型:" + type.getName()));
 
        return configManager
            .getConfig(type, id)
            .filter(NetworkProperties::isEnabled)
            .flatMap(networkProvider::createConfig)
            .flatMap(config -> handler.apply(networkProvider, id, config));//这里 `handler.apply` 回调的是 `this::createOrUpdate`
    }

org.jetlinks.pro.network.ClusterNetworkManager#createOrUpdate

public Mono<Network> createOrUpdate(NetworkProvider<Object> provider, String id, Object properties) {
        ReactiveCacheContainer<String, Network> networkStore = getNetworkStore(provider.getType());
        //这个 provider 视为网络的组件的类型 是 `org.jetlinks.pro.network.NetworkProvider` 的派生类
        return networkStore.compute(id, (key, network) -> {
            if (network == null) {
                //不存在创建这个 网络组件
                return provider.createNetwork(properties);
            }
            //存在重载这个 网络组件
            return provider.reload(network, properties);
        });
    }

MqttClientProvider#reload

org.jetlinks.pro.network.mqtt.client.MqttClientProvider#reload org.jetlinks.pro.network.mqtt.client.MqttClientProvider#initMqttClient

 public Mono<Network> initMqttClient(VertxMqttClient mqttClient, MqttClientProperties properties) {//properties MQTT网络组件的配置
        return convert(properties)
            .map(options -> {
                mqttClient.setTopicPrefix(properties.getTopicPrefix());//订阅主题前缀
                mqttClient.setLoading(true);
                MqttClient client = MqttClient.create(vertx, options);
                mqttClient.setClient(client);
                client.connect(properties.getRemotePort(), properties.getRemoteHost(), result -> {
                    mqttClient.setLoading(false);
                    if (!result.succeeded()) {
                        log.warn("connect mqtt [{}@{}:{}] error",
                                 properties.getClientId(),
                                 properties.getRemoteHost(),
                                 properties.getRemotePort(),
                                 result.cause());
                        mqttClient.getNetMonitor().error(result.cause());
                    } else {
                        log.debug("connect mqtt [{}] success", properties.getId());
                    }
                });
                return mqttClient;
            });
    }

网络组件的事件

org.jetlinks.pro.network.manager.service.NetworkConfigService#start

org.jetlinks.pro.network.ClusterNetworkManager#reload

 
  @Override
    public Mono<Void> reload(NetworkType type, String id) {
        return doReload(type.getId(), id)
            .then(
                //异步处理, 发布事件; 谁监听了, 在哪监听了? 见下
                eventBus.publish("/_sys/network/" + type.getId() + "/reload", id)
            ).then();
 
    }
 

org.jetlinks.pro.network.ClusterNetworkManager#run

    @Override
    public void run(String... args) {
        ...
        eventBus
            .subscribe(Subscription
                           .builder()
                           .subscriberId("network-config-manager")
                           .topics("/_sys/network/*/*")//这里监听 /_sys/network/ 事件
                           .justBroker()
                           .build())
            .flatMap(payload -> {
                Map<String, String> vars = payload.getTopicVars("/_sys/network/{type}/{action}");
                String id = payload.bodyToString(true);
                log.debug("{} network [{}-{}]", vars.get("action"), vars.get("type"), id);
                if ("reload".equals(vars.get("action"))) {//重载
                    return this
                        .doReload(vars.get("type"), id)//next
                        .onErrorResume(err -> {
                            log.error("reload network error", err);
                            return Mono.empty();
                        });
                }
                if ("shutdown".equals(vars.get("action"))) {//关闭
                    return doShutdown(vars.get("type"), id);
                }
                if ("destroy".equals(vars.get("action"))) {//销毁
                    return doDestroy(vars.get("type"), id);
                }
                return Mono.empty();
            })
 

org.jetlinks.pro.network.ClusterNetworkManager#doReload org.jetlinks.pro.network.ClusterNetworkManager#createOrUpdate

public Mono<Network> createOrUpdate(NetworkProvider<Object> provider, String id, Object properties) {
    ReactiveCacheContainer<String, Network> networkStore = getNetworkStore(provider.getType());
    return networkStore.compute(id, (key, network) -> {
        if (network == null) {
            return provider.createNetwork(properties);
        }
        return provider.reload(network, properties);
    });
}

示网络组件类型 org.jetlinks.pro.network.mqtt.client.MqttClientProvider#reload

规则引擎

关键对象

RuleNode(规则节点): 规则节点描述具体执行的逻辑 RuleLink(规则连线): 用于将多个节点连接起来,将上一个节点的输出结果作为下一个节点的输入结果. org.jetlinks.rule.engine.api.model.RuleModel(规则模型):由多个RuleNode(规则节点),RuleLink(规则连线)组成 理解为规则的整个模型描述

Input(输入): 规则节点的数据输入 Output(输出): 规则节点的数据输出

Scheduler(调度器): 负责将模型转为任务(Job),并进行任务调度到 Worker Worker(工作器): 负责执行,维护任务.

org.jetlinks.rule.engine.api.task.ExecutionContext(执行上下文): 启动任务时的上下文,通过上下文获取输入输出配置信息等进行任务处理.

org.jetlinks.rule.engine.api.task.TaskExecutor(任务执行器): 具体执行任务逻辑的实现 org.jetlinks.rule.engine.api.task.TaskExecutorProvider(任务执行器提供器): 用于根据模型配置以及上下文创建任务执行器. TaskExecutor 分为两类

  1. FunctionTaskExecutor 功能性任务, 无状态? 例如: 消息通知,
  2. ExecutableTaskExecutor 有状态任务, 例如: 定时任务, 延时任务, 场景联动(scene)

RuleData(规则数据): 任务执行过程中的数据实例

规则编排

实例组件数据是存在数据库的 rule_instance 表 前端界面主要基于 Node-RED 改造的;

源码流程

规则引擎启用接口: /rule-engine/instance/1610114117243383808/_start

org.jetlinks.pro.rule.engine.web.RuleInstanceController#start

public Mono<Boolean> start(@PathVariable String id) {
    return Authentication
        .currentReactive()
        .flatMap(auth -> instanceService
            .findById(id)
            .doOnNext(instance -> {
                //主要是校验一下配置是否正确
                for (RuleNodeModel node : instance.toRule(modelParser).getNodes()) {
                    if (!properties.getExecutorFilter().test(auth, node.getExecutor())) {
                        throw new BusinessException("error.rule_nodes_cannot_be_used" , node.getExecutor() , 500 , node.getExecutor());
                    }
                }
            }))
        .then(instanceService.start(id))//见下
        .thenReturn(true)
        ;
}

构造 RuleModel

org.jetlinks.pro.rule.engine.service.RuleInstanceService#start org.jetlinks.pro.rule.engine.service.RuleInstanceService#doStart

  private Mono<Void> doStart(RuleInstanceEntity entity) {
        return Mono
            .defer(() -> {
                RuleModel model = entity.toRule(modelParser);//解析为 RuleModel 但不包括输入/输出连接线, 其会在内部以属性表示
                return ruleEngine
                    .startRule(entity.getId(), model)//开始运行
                    .then(createUpdate()//更新数据库 状态
                              .set(RuleInstanceEntity::getState, RuleInstanceState.started)
                              .where(entity::getId)
                              .execute()).then();
            });
    }

主要逻辑

org.jetlinks.rule.engine.cluster.ClusterRuleEngine#startRule

public Flux<Task> startRule(String instanceId, RuleModel model) {
        log.debug("starting rule {}\n{}", instanceId, model.toString());
        //编译 
        //<!> 调度任务编译器, 将规则模型的所有节点, 编译成调度任务
        //<!> 最终是 一个节点一个 ScheduleJob; 
        Map</*nodeId*/String, /*Job*/ScheduleJob> jobs = new ScheduleJobCompiler(instanceId, model)
                .compile()
                .stream()
                .collect(Collectors.toMap(ScheduleJob::getNodeId, Function.identity()));
        List<Task> startedTask = new ArrayList<>(jobs.size());
        //新增调度的任务
        Map</*nodeId*/String, /*Job*/ScheduleJob> readyToStart = new ConcurrentHashMap<>(jobs);
 
        //获取调度记录
        return repository
                .findByInstanceId(instanceId)
                .doOnNext(snapshot -> readyToStart.remove(snapshot.getJob().getNodeId()))
                .flatMap(snapshot -> {
                    ScheduleJob job = jobs.get(snapshot.getJob().getNodeId());
                    ScheduleJob old = snapshot.getJob();
                    //新的规则减少了任务,则尝试移除旧的任务
                    if (job == null || !Objects.equals(job.getExecutor(), old.getExecutor())) {
                        if (job != null && !Objects.equals(job.getExecutor(), old.getExecutor())) {
                            //移除了旧的,需要重新调度新的
                            readyToStart.put(job.getNodeId(), job);
                            log.debug("change job [{}] executor:{} -> {}", snapshot.getJob().getNodeId(),
                                      snapshot.getJob().getExecutor(), job.getExecutor());
                        } else {
                            log.debug("shutdown removed job:{}", snapshot.getJob().getNodeId());
                        }
                        return this
                                .shutdown(snapshot)
                                .then(Mono.empty());
                    }
                    return this
                            .getTaskBySnapshot(snapshot)
                            .flatMap(task -> {
                                startedTask.add(task);
                                //重新加载任务
                                return task
                                        .setJob(job)
                                        .then(task.reload())
                                        .thenReturn(task);
                            })
                            //没有worker调度此任务? 重新调度
                            .switchIfEmpty(Mono.fromRunnable(() -> readyToStart.put(job.getNodeId(), job)));
 
                })
                //<!>doStart 这里这里 见下: 
                .concatWith(Flux.defer(() -> doStart(readyToStart.values())).doOnNext(startedTask::add))
                .collectList()
                .map(Flux::fromIterable)
                //保存快照信息
                .flatMapMany(tasks -> repository
                        .saveTaskSnapshots(tasks.flatMap(Task::dump))
                        .thenMany(tasks))
                .onErrorResume(err -> {
                    //失败时停止全部任务
                    return Flux
                            .fromIterable(startedTask)
                            .flatMap(Task::shutdown)
                            .then(Mono.error(err));
                });
    }

统一启动 Task

org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart

protected Flux<Task> doStart(Collection<ScheduleJob> jobs) {
    return Flux
            .defer(() -> Flux
                    .fromIterable(jobs)
                    //<!> this::scheduleTask;  1. 通过 ScheduleJob 找到 Scheduler(调度器), 2. 通过 Scheduler 将 ScheduleJob 转为 Task
                    .flatMap(this::scheduleTask)
                    //将所有Task创建好之后再统一启动
                    .collectList()
                    .flatMapIterable(Function.identity())
                    //<!> 最终统一启动 org.jetlinks.rule.engine.defaults.DefaultTask 
                    .flatMap(task -> task.start().thenReturn(task)));
}
private Flux<Task> scheduleTask(ScheduleJob job) {
        //通过 ScheduleJob 找到 Scheduler(调度器)
    return selectScheduler(job)
            .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("no scheduler for " + job.getExecutor())))
            // 见下, 其有两个实现 org.jetlinks.rule.engine.defaults.LocalScheduler 和 ClusterRemoteScheduler
            .flatMap(scheduler -> scheduler.schedule(job));
}
private Flux<Scheduler> selectScheduler(ScheduleJob job) {
    return schedulerRegistry
            .getSchedulers()
            .filterWhen(scheduler -> scheduler.canSchedule(job))
            .as(supports -> schedulerSelector.select(supports, job));
}
LocalScheduler 调度器

<!> 停止旧的 Task org.jetlinks.rule.engine.defaults.LocalScheduler#schedule

public Flux<Task> schedule(ScheduleJob job) {
 
return Flux
        //这里逻辑是 如果有旧的任务 拿到并停止;
        .fromIterable(getExecutor(job.getInstanceId(), job.getNodeId()))
        .flatMap(task -> {
            //停止旧任务
            removeTask(task);
            return task.shutdown();
        })
        // <!> 见下
        .thenMany(createExecutor(job));
}
//停止旧任务 逻辑
private void removeTask(Task task) {
    tasks.remove(task.getId());
    //从 executors Map中移出这个任务
    getExecutor(task.getJob().getInstanceId(), task.getJob().getNodeId()).remove(task);
}

这里是调度器(Scheduler)有个 private final Map<String/*规则实例ID*/, Map<String/*nodeId*/, List<Task>>> executors = new ConcurrentHashMap<>(); Map 存着所有运行着的任务(Task)

Worker 创建 Task

<!> 根据 ScheduleJob 找到 Worker 再用它创建 Task org.jetlinks.rule.engine.defaults.LocalScheduler#createExecutor

private Flux<Task> createExecutor(ScheduleJob job) {
    return findWorker(job.getExecutor(), job)
            .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor())))
            // <!> 见下
            .flatMap(worker -> worker.createTask(id, job))
            .doOnNext(this::addTask);
}

创建上下文(ExecutionContext) 创建任务 (TaskExecutor)

org.jetlinks.rule.engine.defaults.LocalWorker#createTask org.jetlinks.rule.engine.cluster.worker.ClusterWorker#createTask

@Override
public Mono<Task> createTask(String schedulerId, ScheduleJob job) {
    return Mono.justOrEmpty(executors.get(job.getExecutor()))//找到对应的 TaskExecutorProvider
            .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor())))
            .flatMap(provider -> {
                //<!> 创建上下文(ExecutionContext)
                DefaultExecutionContext context = createContext(job);
                //<!> 创建任务 (TaskExecutor)
                return provider.createTask(context) 
                        .map(executor -> new DefaultTask(schedulerId, this.getId(), context, executor));
            });
}
ExecutionContext 有哪些东西?

org.jetlinks.rule.engine.cluster.worker.AbstractExecutionContext

通过 ExecutionContext 可以拿到 ScheduleJob(任务信息), 向下游节点输出数据, 告知执行异常 事件等

在哪绑定线程?

绑定线程其实是在 Task 中, 使用 Reactor subscribeOn()给定了调度器 org.jetlinks.rule.engine.defaults.DefaultTask#start

public class DefaultTask implements Task {
     @Override
    public Mono<Void> start() {
        log.debug("start task[{}]:[{}]", getId(), getJob());
        return Mono.<Void>fromRunnable(executor::start)//调用 TaskExecutor (执行器)
                   .doOnSuccess((v) -> startTime = System.currentTimeMillis())
                   .subscribeOn(Schedulers.boundedElastic());//<!>注意 这里绑了个调度器
    }
    .....
}

如何保证顺序?

关键是 ExecutionContext 的 Output 见: org.jetlinks.rule.engine.defaults.AbstractExecutionContext 见: org.jetlinks.rule.engine.api.task.Output

Output 实例是: org.jetlinks.rule.engine.cluster.worker.QueueOutput Input 实例是: org.jetlinks.rule.engine.cluster.worker.QueueInput

其实Reactor 天生就适合这种数据流的订阅的业务, 只要构造流时按顺序绑定订阅执行就行; 即 org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart

节点的数据流

向下游输入

Map<String, Object> data = new HashMap<>();
long currentTime = System.currentTimeMillis();
data.put("timestamp", currentTime);
data.put("_now", currentTime);
context
    .getOutput()
    .write(context.newRuleData(data))

获取上游输出

context
    .getInput()
    .accept()
    .filter((data) -> state == Task.State.running)
    .flatMap(data -> doSomething...)
 

简略总结

  1. RuleInstanceEntity 将转换 RuleModel org.jetlinks.pro.rule.engine.service.RuleInstanceService#doStart

  2. 用 ScheduleJobCompiler 编译 ScheduleJob 描述对象 org.jetlinks.rule.engine.cluster.ClusterRuleEngine#startRule

  3. 将 ScheduleJob 转换为 Task org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart

  4. 统一执行 org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart

其他关键的对象

org.jetlinks.rule.engine.api.RuleEngine 是规则统一执行/管理的入口, 它接受RuleModel启动规则

org.jetlinks.rule.engine.api.model.RuleModel 是由多个RuleNode(规则节点),RuleLink(规则连线)组成 理解为整个规则模型描述, 通过解析器(RuleEngineModelParser)解析数据库实体数据.

org.jetlinks.rule.engine.api.model.RuleEngineModelParser 是规则引擎模型解析器,支持将不同的模型格式转换为规则模型(不同类型? 因为前端基于node-red, 所以需要解析为node-red的node结构)

org.jetlinks.rule.engine.api.scheduler.ScheduleJob 一个节点会转为一个ScheduleJob 它作为整个节点任务信息的包装对象, 它持有执行器名称 ,执行器配置, 规则配置, 输入, 输出等信息; Scheduler(调度器) 可使用它找 TaskExecutorProvider 和创建 Task

org.jetlinks.rule.engine.api.task.Task Task也是个包装对象, 主要包装 TaskExecutor(执行器) , 实际上它更接近官文档 Workr 的描述?: 负责执行,维护任务

了解下几类内置节点(任务)

org.jetlinks.rule.engine.api.task.TaskExecutorProvider

参考文档

周期性执行的节点 TimerTaskExecutor

定时器这种周期性的, 是如何执行的呢?

定时器其实现是: org.jetlinks.pro.rule.engine.executor.TimerTaskExecutorProvider.TimerTaskExecutor

private Disposable execute() {
    Duration nextTime = nextDelay.get();
    if (this.disposable != null) {
        this.disposable.dispose();
    }
    return this.disposable =
        Mono.delay(nextTime, scheduler)//延时执行
            .flatMap(t -> {
                if (!testSchedule()) {
                    return Mono.empty();
                }
                context.getLogger().debug("trigger timed task on {}", metadataManager.currentServerId());
                Map<String, Object> data = new HashMap<>();
                long currentTime = System.currentTimeMillis();
                data.put("timestamp", currentTime);
                data.put("_now", currentTime);
                return context
                    .getOutput()
                    .write(context.newRuleData(data))
                    .then(context
                        .fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis()))
                        .thenReturn(1));
 
            })
            .onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
            .doFinally(s -> {
                if (getState() == Task.State.running && s != SignalType.CANCEL) {
                    execute();//完成后, 在继续.. 周期性
                }
            })
            .subscribe();
}

ReactorQL 节点 ReactorQLTaskExecutor

见:org.jetlinks.pro.rule.engine.executor.ReactorQLTaskExecutorProvider.ReactorQLTaskExecutor

通过 ExecutionContext 可以拿到 ScheduleJob(任务信息), 向下游节点输出数据, 告知执行异常 事件等

...
@Override
protected Disposable doStart() {
    Flux<Object> dataStream;
    //有上游节点
    if (!CollectionUtils.isEmpty(context.getJob().getInputs())) {
 
        dataStream = context
            .getInput()
            .accept()
            .flatMap(ruleData -> reactorQL
                .start(Flux.just(RuleDataHelper.toContextMap(ruleData)))
                .map(ruleData::newData)
                .onErrorResume(err -> {
                    context.getLogger().error(err.getMessage(), err);
                    return context.onError(err, null).then(Mono.empty());
                }));
    } else {
        dataStream = reactorQL
            .start(table -> {
                // 虚拟源表 dual 
                if (table == null || table.equalsIgnoreCase("dual")) {
                    return Flux.just(1);
                }
                if (table.startsWith("/**")) {
                    throw new UnsupportedOperationException("unsupported topic:" + table);
                }
                if (table.startsWith("/")) {
                    boolean clusterSubscriber =
                        context.getJob().getSchedulingRule() == null
                            || SchedulerSelectorStrategy.isAll(context.getJob().getSchedulingRule());
                    Codec<JSONObject> codec = Codecs.lookup(JSONObject.class);
                    return this
                        .refactorTable(table)
                        .flatMapMany(topics -> {
                            //从事件总线里订阅
                            return eventBus
                                .subscribe(Subscription.of(
                                                "rule-engine:"
                                                    .concat(context.getInstanceId())
                                                    .concat(":")
                                                    .concat(context.getJob().getNodeId()),
                                                topics.toArray(new String[0]),
                                                clusterSubscriber
                                                    ? new Subscription.Feature[]{Subscription.Feature.local}
                                                    : new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker}
                                            ),
                                            codec);
                        });
                }
                return Flux.just(1);
            })
            .cast(Object.class);
    }
 
    return dataStream
        .flatMap(result -> {
            RuleData data = context.newRuleData(result);
            //输出到下一节点
            return context
                .getOutput()
                .write(Mono.just(data))
                .then(context.fireEvent(RuleConstants.Event.result, data));
        })
        .onErrorResume(err -> context.onError(err, null))
        .subscribe();
}
...

告警节点 org.jetlinks.pro.rule.engine.alarm.AlarmTaskExecutorProvider

org.jetlinks.pro.rule.engine.alarm.AlarmTaskExecutorProvider.AlarmTaskExecutor

public AlarmTaskExecutor(ExecutionContext context, AlarmRuleHandler handler) {
    super("告警", context);
    this.handler = handler;
    reload();
}
 
@Override
public String getName() {
    return config.getMode() == AlarmMode.relieve
        ? "解除告警" : "触发告警";
}
@Override
protected Publisher<RuleData> apply(RuleData input) {
    return executor
        .apply(input)// 调用表达式 lambda; 见 org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler
        .doOnError(err -> log.warn("{} alarm error,rule:{}", config.mode, context.getInstanceId(), err))
        .map(result -> context.newRuleData(input.newData(result.toMap())));
}
 
@Override
public void reload() {
    config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
    ValidatorUtils.tryValidate(config);
    // <!> 注意这里, 根据配置来初始化  "解除告警"(triggered) 还是"触发告警"(relieved) lambda
    if (config.mode == AlarmMode.relieve) {
        executor = input -> handler.relieved(context, input);//<!> 见下,  注意这里时表达式, 即触发时会执行的逻辑
    } else {
        executor = input -> handler.triggered(context, input);
    }
}
触发告警 “数据入库”

org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#triggerAlarm

private Mono<AlarmInfo> triggerAlarm(AlarmInfo result) {
        AlarmRecordEntity record = ofRecord(result);
 
        //更新告警状态.
        return alarmRecordService // 状态更新
            .createUpdate()
            .set(record)
            .where(AlarmRecordEntity::getId, record.getId())
            .and(AlarmRecordEntity::getState, AlarmRecordState.warning)
            .execute()
            //更新数据库报错,依然尝试触发告警!
            .onErrorResume(err -> {
                log.error("trigger alarm error", err);
                return Reactors.ALWAYS_ZERO;
            })
            .flatMap(total -> {
                AlarmHistoryInfo historyInfo = createHistory(record, result);
                result.setAlarmTime(record.getAlarmTime());
 
                //更新结果返回0 说明是新产生的告警数据 
                if (total == 0) {
                    result.setFirstAlarm(true);
                    result.setAlarming(false);
 
                    return alarmRecordService
                        .save(record)
                        .then(historyService.save(historyInfo))
                        .then(publishAlarmRecord(historyInfo, result))//<!> 注意, 第一次才会发布到事件总线; 见下:  触发告警 "发布事件总线"
                        .then(publishEvent(historyInfo))//Spring 体系的事件
                        .then(saveAlarmCache(result, record));
                }
                result.setFirstAlarm(false);
                result.setAlarming(true);
 
                return historyService
                    .save(historyInfo)
                    .then(publishEvent(historyInfo))//Spring 体系的事件
                    .then(saveAlarmCache(result, record));
            });
    }

org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#publishEvent org.springframework.context.support.AbstractApplicationContext#publishEvent(org.springframework.context.ApplicationEvent) 这个分支到 Spring 体系的事件了, 没啥重点, 盲猜数据入ES库

触发告警 “发布事件总线”

org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#publishAlarmRecord

public Mono<Void> publishAlarmRecord(AlarmHistoryInfo historyInfo, AlarmInfo alarmInfo) {
    String topic = Topics.alarm(historyInfo.getTargetType(), historyInfo.getTargetId(), historyInfo.getAlarmConfigId());
    // topic = /alarm/device/TCTTST-tct001/1620309816492253184/record
    return Flux
        .concat(
            Mono.just(topic),
            ReactiveAuthenticationHolder
                .get(alarmInfo.getOwnerId())
                .flatMap(AssetsHolder::fromAuth)
                .flatMapMany(holder -> holder.refactorTopic(topic))
        )
        .distinct()
        .flatMap(assetTopic -> eventBus.publish(assetTopic, historyInfo))
        .then();
}
解除告警

org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#relieved 略..

场景联动的节点

设备触发: org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider 定时触发: org.jetlinks.pro.rule.engine.executor.TimerTaskExecutorProvider

见下: [场景联动]

脚本函数节点

脚本节点会转成 LambdaTask 表达式 见: org.jetlinks.pro.rule.engine.executor.ScriptTaskExecutorProvider 见: org.jetlinks.rule.engine.defaults.LambdaTaskExecutor

场景联动

场景联动是规则引擎中,一种业务逻辑的可视化编程方式,您可以通过可视化的方式定义设备之间联动规则。

in short 其实就是规则引擎, 在规则引擎中它是一个 TaskExecutorProvider, 目前有三类触发方式 设备触发: org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider 手动触发,也是: org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider 定时触发: org.jetlinks.pro.rule.engine.executor.TimerTaskExecutorProvider

场景联动数据存储在 rule_scene

启用接口

接口: /scene/1616415881994473472/_enable

实例组件数据是存在数据库的 rule_scene

org.jetlinks.pro.rule.engine.web.SceneController#enabledScene org.jetlinks.pro.rule.engine.service.SceneService#enable

只调用 Service 层 更新了数据库;

@Transactional(rollbackFor = Throwable.class)
public Mono<Void> enable(String id) {
    Assert.hasText(id, "id can not be empty");
    long now = System.currentTimeMillis();
    return this
        .createUpdate()
        .set(SceneEntity::getState, RuleInstanceState.started)
        .set(SceneEntity::getModifyTime, now)
        .set(SceneEntity::getStartTime, now)
        .where(SceneEntity::getId, id)
        .execute()
        .then();
}

这里是它项目框架(hsweb-framework) Service CURD 会触发事件; 最终在事件 hook方法中 流程处理到规则引擎 org.jetlinks.pro.rule.engine.service.SceneService#handleEvent

private Mono<Void> handleEvent(Collection<SceneEntity> entities) {
    return Flux
        .fromIterable(entities)
        .flatMap(scene -> {
            //禁用时,停止规则
            if (scene.getState() == RuleInstanceState.disable) {
                return ruleEngine.shutdown(scene.getId());
            }else if (scene.getState() == RuleInstanceState.started){
                scene.validate();
                //<!> 转换为 RuleModel 调用规则引擎 运行
                return ruleEngine.startRule(scene.getId(), scene.toRule().getModel());
            }
            return Mono.empty();
        })
        .then();
}

场景任务 规则节点

两个对象 org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider.SceneTaskExecutor

规则入口 org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider.SceneTaskExecutor#init

private Disposable init() {
            if (disposable != null) {
                disposable.dispose();
            }
            boolean useBranch = CollectionUtils.isNotEmpty(rule.getBranches());
 
            SqlRequest request = rule.createSql(!useBranch);
            Flux<Map<String, Object>> source;
 
            //不是通过SQL来处理数据
            if (request.isEmpty()) {
                source = context
                    .getInput()
                    .accept()
                    .flatMap(RuleData::dataToMap);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("init scene [{}:{}], sql:{}", rule.getId(), rule.getName(), request.toNativeSql());
                }
                //<!> 见下: 数据来源 (触发来源)
                ReactorQLContext qlContext = createReactorQLContext();
 
                //sql参数
                for (Object parameter : request.getParameters()) {
                    qlContext.bind(parameter);
                }
                source = ReactorQL
                    .builder()
                    .sql(request.getSql())
                    .build()
                    .start(qlContext)
                    .map(ReactorQLRecord::asMap);
            }
 
            // 分支条件 // <!> 所有场景联动, 会执行到这个分支?
            if (useBranch) {
                return rule
                    .createBranchHandler(
                        source,
                        (idx, nodeId, data) -> {
                            if (log.isDebugEnabled()) {
                                log.debug("scene [{}] branch [{}] execute", rule.getId(), nodeId);
                            }
                            RuleData ruleData = context.newRuleData(data);
                            return  eventBus
                                    .publish("/scene/rule/" +  rule.getId(), buildSceneData(data))//事件总线: 发布场景事件
                                    .then(context
                                        .getOutput()//<!> 见: 告警节点 org.jetlinks.pro.rule.engine.alarm.AlarmTaskExecutorProvider; 初始化时就定义好了;
                                        .write(nodeId, ruleData)
                                        .onErrorResume(err -> context.onError(err, ruleData))
                                        .as(tracer()));
 
                        });
            }
 
            //防抖
            Trigger.GroupShakeLimit shakeLimit = rule.getTrigger().getShakeLimit();
            if (shakeLimit != null && shakeLimit.isEnabled()) {
 
                ShakeLimitGrouping<Map<String, Object>> grouping = shakeLimit.createGrouping();
 
                source = shakeLimit.transfer(
                    source,
                    (time, flux) -> grouping
                        .group(flux)
                        .flatMap(group -> group.window(time), Integer.MAX_VALUE),
                    (map, total) -> map.put("_total", total));
            }
            return source
                .flatMap(this::handleOutput)
                .subscribe();
        }
        

数据来源 (触发来源)

org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider.SceneTaskExecutor#createReactorQLContext

private ReactorQLContext createReactorQLContext() {
    return ReactorQLContext
        .ofDatasource(table -> {
            if (table.startsWith("/")) {// <!> 事件总线, 设备消息...
                //来自事件总线
                return this
                    .subscribe(table)
                    //有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题
                    .as(FluxUtils.distinct(this::getDataId, Duration.ofSeconds(1)));
            } else {
                //来自上游(定时等) <!> 上游 手动触发, 定时器...
                return context
                    .getInput()
                    .accept()
                    .flatMap(RuleData::dataToMap);
            }
        });
}

规则引擎 在事件总线中的消息

事件总线中, 具体事件主题可参考: org.jetlinks.rule.engine.api.RuleConstants.Topics

文档也有说明

唉, 文档不全只能自己手动调试.

/rule-engine/1610114117243383808/*/logger/*
/rule-engine/debug
/rule-engine/notification/#
/rule-engine/status/#
/rule-engine/notification/node/#
/rule-engine/event-log/#
 
# 分支执行事件
/rule-engine/1610114117243383808/*/event/result
/rule-engine/1620993288546447360/branch_1_group_1_action_1/event/result
/rule-engine/1620993288546447360/branch_1_group_1_action_1/event/complete
 
# 分支错误事件
/rule-engine/1610114117243383808/*/event/error

WebSocket订阅 处理流程

在1.1版本后提供websocket方式订阅平台消息的功能。 可以通过websocket来订阅设备、规则引擎、设备告警等相关消息

TODO 实调试 配置入口 org.jetlinks.pro.gateway.external.socket.WebSocketMessagingHandlerConfiguration

注册Spring HandlerMapping

大概是 WebSocketMessagingHandler 会根据 websocekt 请求, 向 MessagingManager 订阅 topic消息; 返回的 HandlerMapping 是Spring体系的; 处理 /messaging/**的url请求

@Bean
public HandlerMapping webSocketMessagingHandlerMapping(MessagingManager messagingManager,
                                                        ObjectProvider<WebSocketAuthenticationHandler> handlers) {
    CompositeWebSocketAuthenticationHandler authenticationHandler = new CompositeWebSocketAuthenticationHandler();
    handlers.forEach(authenticationHandler::addHandler);
    WebSocketMessagingHandler messagingHandler = new WebSocketMessagingHandler(//见下
        messagingManager,
        authenticationHandler
    );
    final Map<String, WebSocketHandler> map = new HashMap<>(1);
    map.put("/messaging/**", messagingHandler);
 
    final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
    mapping.setUrlMap(map);
    
    return mapping;
}

解析请求&处理 MessagingRequest

解析请求, 根据不同的类型 分支处理 org.jetlinks.pro.gateway.external.socket.WebSocketMessagingHandler org.jetlinks.pro.gateway.external.socket.WebSocketMessagingHandler#handle

@Override
@Nonnull
@SuppressWarnings("all")
public Mono<Void> handle(@Nonnull WebSocketSession session) {
 
    //订阅信息
    Map<String, Disposable> subs = new ConcurrentHashMap<>();
 
    return authenticationHandler
        //从session中获取权限信息
        .handle(session)
        //没有权限则返回认证错误并关闭连接
        .switchIfEmpty(session
                            .send(Mono.just(session.textMessage(JSON.toJSONString(Message.authError()))))
                            .then(session.close(CloseStatus.BAD_DATA))
                            .then(Mono.empty()))
        .flatMap(auth -> session
            .receive()
            .doOnNext(message -> {
                try {
                    ......
 
                    //解析request请求
                    MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
                    if (request == null) {
                        return;
                    }
                    ......
                    
                    if (request.getType() == MessagingRequest.Type.sub) {//订阅
                        //重复订阅,忽略
                        Disposable old = subs.get(request.getId());
                        if (old != null && !old.isDisposed()) {
                            return;
                        }
                        Map<String, String> context = new HashMap<>();
                        context.put("userId", auth.getUser().getId());
                        context.put("userName", auth.getUser().getName());
                        Disposable sub = messagingManager
                            .subscribe(SubscribeRequest.of(request, auth))//<!>见下, 向 MessagingManager 订阅(附带授权信息)
                            .doOnEach(ReactiveLogger.onError(err -> log.error("{}", err.getMessage(), err)))
                        ......
                    } else if (request.getType() == MessagingRequest.Type.unsub) {//取消订阅
                        //取消订阅则释放订阅
                        Optional.ofNullable(subs.remove(request.getId()))
                                .ifPresent(Disposable::dispose);
                    } else {
                        sendToSession(session, Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType()));
                    }
                    } catch (Exception e) {
                        log.warn(e.getMessage(), e);
                        sendToSession(session, Message.error("illegal_argument", null, "消息格式错误"));
                    }
                })
                .then())
            .doFinally(r -> {
                //连接断开时,释放全部订阅
                subs.values().forEach(Disposable::dispose);
                subs.clear();
            });
 
    }

in short 就是附带用户信息, 从 MessagingManager 订阅

关于消息管理器 MessagingManager

基于SubscriptionProvider. 根据请求中的 topicSubscribeRequest.getTopic() 使用AntPathMatcher进行匹配SubscriptionProvider.getTopicPattern().

将调用满足条件的 SubscriptionProvider.subscribe(SubscribeRequest) 来获取数据.

in short 中介者模式; 1. 调用其 MessagingManager#subscribe 可以订阅消息, 其消息来源由 SubscriptionProvider 提供; 2. 调用其MessagingManager#register 可以注册 Provider;

org.jetlinks.pro.gateway.external.DefaultMessagingManager

 @Override
public Flux<Message> subscribe(SubscribeRequest request) {
 
    return Flux.defer(() -> {
        for (SubscriptionProvider provider : subProvider) {
            for (String pattern : provider.getTopicPattern()) {
                if (matcher.match(pattern, request.getTopic())) {
                    return provider
                        .subscribe(request)
                        .map(v -> {
                            //兼容之前的API(直接返回Message)
                            if (v instanceof Message) {
                                return ((Message) v);
                            }
                            return Message.success(request.getId(), request.getTopic(), v);
                        });
                }
            }
        }
        return Flux.error(new UnsupportedOperationException("不支持的topic"));
    });
}
 
 
public void register(SubscriptionProvider provider) {
    subProvider.add(provider);
    //重新排序
    subProvider.sort(Comparator.comparingInt(SubscriptionProvider::getOrder));
}

消息来源

规则引擎 Provider

org.jetlinks.pro.rule.engine.messaging.RuleEngineSubscriptionProvider

public String[] getTopicPattern() {
    return new String[]{"/rule-engine/**"};
}
 

告警消息 Provider

org.jetlinks.pro.device.message.DeviceAlarmSubscriptionProvider

public String[] getTopicPattern() {
    return new String[]{"/rule-engine/device/alarm/*/*/*"};
}

Pro 版 <消息上行> 处理流程

前面就略了 参考笔记: 消息上报 (MQTT Broker)

org.jetlinks.pro.gateway.DeviceGatewayHelper#handleDeviceMessage(org.jetlinks.core.message.DeviceMessage, java.util.function.Function<org.jetlinks.core.device.DeviceOperator,org.jetlinks.core.server.session.DeviceSession>, java.util.function.Consumer<org.jetlinks.core.server.session.DeviceSession>, java.lang.Runnable)

org.jetlinks.pro.gateway.DeviceGatewayHelper#handleDeviceMessage(org.jetlinks.core.message.DeviceMessage, java.util.function.Function<org.jetlinks.core.device.DeviceOperator,org.jetlinks.core.server.session.DeviceSession>, java.util.function.Consumer<org.jetlinks.core.server.session.DeviceSession>, java.util.function.Supplier<reactor.core.publisher.Mono<org.jetlinks.core.device.DeviceOperator>>)

org.jetlinks.pro.gateway.DeviceGatewayHelper#handleDeviceMessage(org.jetlinks.core.message.DeviceMessage, java.util.function.Function<org.jetlinks.core.device.DeviceOperator,reactor.core.publisher.Mono<org.jetlinks.core.server.session.DeviceSession>>, java.util.function.Function<org.jetlinks.core.server.session.DeviceSession,reactor.core.publisher.Mono<java.lang.Void>>, java.util.function.Supplier<reactor.core.publisher.Mono<org.jetlinks.core.device.DeviceOperator>>)

public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
                Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
                Function<DeviceSession, Mono<Void>> sessionConsumer,
                Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
     ......
        if (doHandle) {
            then = handleMessage(null, message) //见下
                .then(then);
        }
 
        return this
            .createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
            .flatMap(sessionConsumer)
            .then(then)
            .contextWrite(Context.of(DeviceMessage.class, message));
 
    }

org.jetlinks.pro.gateway.DeviceGatewayHelper#handleMessage

  private Mono<Void> handleMessage(DeviceOperator device, Message message) {
        return messageHandler////见下 , 实例是 org.jetlinks.pro.device.message.DeviceMessageConnector
            .handleMessage(device, message)
            //转换为empty,减少触发discard
            .flatMap(ignore -> Mono.empty());
    }

子设备等消息处理

org.jetlinks.pro.device.message.DeviceMessageConnector#handleMessage

 public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
    Mono<Boolean> then;
    //子设备的消息回复
    if (message instanceof ChildDeviceMessageReply) {
        then = this
            //回复给网关设备
            .doReply(((ChildDeviceMessageReply) message))
            .then(
                handleChildrenDeviceMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
            );
    }
    //子设备消息
    else if (message instanceof ChildDeviceMessage) {
        then = handleChildrenDeviceMessage(((ChildDeviceMessage) message).getChildDeviceMessage());
    }
    //设备回复消息
    else if (message instanceof DeviceMessageReply) {
        then = doReply(((DeviceMessageReply) message));
    }
    //其他消息
    else {
        //do nothing
        then = Reactors.ALWAYS_TRUE;
    }
    return TraceHolder
        .writeContextTo(message, Message::addHeader)
        .flatMap(this::onMessage)// <!> 见下
        .then(then)
        .defaultIfEmpty(false);
 
}
 

推送到事件

org.jetlinks.pro.device.message.DeviceMessageConnector#onMessage

public Mono<Void> onMessage(Message message) {
        //标记来忽略处理的消息则忽略处理
        if (null == message || message.getHeader(Headers.ignore).orElse(false)) {
            return Mono.empty();
        }
        //添加消息ID,下游可以使用此ID进行去重
        //场景: 一个消息,可能会被推送到多个topic,比如: 有租户,机构等绑定信息时.
        //     如果用户同时在多个租户(A,B)里,设备也同时在这些租户里,那用户可能会收到多次推送(租户A一次,租户B一次)
        //     这是可以利用uid进行去重,参照: ReactorUtils.distinct
        message.addHeader(PropertyConstants.uid, createUid());
        return this
            //转化为设备消息 <!>  注意 注意 注意这里, 处理 DeviceMessage 类型消息, 包括了 DirectDeviceMessage
            .convertToDeviceMessage(message)
            .flatMap(msg -> this
                .getTopic(msg)
                //推送到事件总线中
                .flatMap(topic -> eventBus.publish(topic, msg))
                .then())
            //忽略错误
            .onErrorResume(doOnError);
    }

处理 DeviceMessage 类型消息

org.jetlinks.pro.device.message.DeviceMessageConnector#convertToDeviceMessage

private Mono<DeviceMessage> convertToDeviceMessage(Message message) {
        //只处理设备消息,其他忽略
        if (message instanceof DeviceMessage) {
            DeviceMessage msg = ((DeviceMessage) message);
            return registry
                .getDevice(msg.getDeviceId())
                //重构消息的header,向header中添加一些常用信息,下游可直接从header中获取使用
                //  而不用再从注册中心里获取
                .flatMap(device -> refactorHeader(device, msg)
                    //执行拦截器 <!> 见下 实例是 org.jetlinks.pro.device.message.CompositeDeviceMessagePublishInterceptor
                    .flatMap(newMsg -> interceptor.beforePublish(device, (DeviceMessage) newMsg)))
                .defaultIfEmpty(msg);
        }
        return Mono.empty();
    }

消息拦截器, 计算属性…

org.jetlinks.pro.device.message.CompositeDeviceMessagePublishInterceptor 目前它有三个处理监听器 org.jetlinks.pro.device.service.MetadataMappingDeviceMessagePublishInterceptor//物模型映射拦截器,在将设备消息推送到事件总线前 org.jetlinks.pro.device.message.streaming.DeviceVirtualPropertyStreaming //设备虚拟属性实时计算处理类 org.jetlinks.pro.device.message.MergeLatestPropertyInterceptor//合并最新的数据到消息中

踩坑指南

前端菜单不显示?

2.0 前端菜单渲染有问题, 貌似是自定义渲染的问题 src\app.tsx 注掉 menuItemRendermenuRender属性

 
menuDataRender: () => {
    return getMenus(extraRoutes);
},
 
// subMenuItemRender: MenuItemIcon,
// menuItemRender: (menuItemProps, defaultDom) => {
//   return (menuItemProps, defaultDom, {
//     onClick: () => {
//       history.push(menuItemProps.path!);
//     },
//   });
// },
// menuRender:(props,defaultDom)=>{
//   console.log(defaultDom,'11111111111111')
//   return true
// },
// headerRender:false,
 

MQTT Broker 不订阅主题?

版本2.0 改为网络组件中定义配置订阅主题; 且增加了’路由’概念

但MQTT还是在网关组件中调用 网络组件 的 subscribe 定义主题, 逻辑还是网关组件中.. org.jetlinks.pro.network.mqtt.client.VertxMqttClient#subscribe

重载网关入口

略过前面..

org.jetlinks.pro.network.mqtt.gateway.device.MqttClientDeviceGateway#reload

protected Mono<Void> reload() {
 
    return this
        .getProtocol()
        .flatMap(support -> support
            .getRoutes(DefaultTransport.MQTT)
            .filter(MqttRoute.class::isInstance)
            .cast(MqttRoute.class)
            .collectList()
            .doOnEach(ReactiveLogger
                            .onNext(routes -> {
                                //协议包里没有配置Mqtt Topic信息
                                if (CollectionUtils.isEmpty(routes)) {
                                    log.warn("The protocol [{}] is not configured with topics information", support.getId());
                                }
                            }))
            .doOnNext(this::doReloadRoute))//处理路由
        .then();
}

路由有区分上行 还是下行

 protected void doReloadRoute(List<MqttRoute> routes) {
        Map<RouteKey, Tuple2<Integer, Disposable>> readyToRemove = new HashMap<>(this.routes);
 
        for (MqttRoute route : routes) {
            //不是上行topic,不订阅 [路由还分上下行!]
            if (!route.isUpstream()) {
                continue;
            }
            String topic = convertToMqttTopic(route.getTopic());
            RouteKey key = RouteKey.of(topic, route.getQos());
            readyToRemove.remove(key);
            //尝试更新订阅
            this.routes.compute(key, (_key, old) -> {
                if (old != null) {
                    //QoS没变,不用重新订阅
                    if (old.getT1().equals(_key.qos)) {
                        return old;
                    } else {
                        old.getT2().dispose();
 
                    }
                }
                return Tuples.of(_key.qos, doSubscribe(_key.topic, _key.qos));
            });
        }
        //取消订阅协议包里没有的topic信息
        for (Map.Entry<RouteKey, Tuple2<Integer, Disposable>> value : readyToRemove.entrySet()) {
            this.routes.remove(value.getKey());
            value.getValue().getT2().dispose();
        }
    }

原因: 是在协议里面配置路由 有个参数是否上行!!!

    support.addAuthenticator(DefaultTransport.MQTT, authenticator);
        DefaultConfigMetadata mqttMetaConfig = getMqttDefaultMetaConfig();
        support.addConfigMetadata(DefaultTransport.MQTT, mqttMetaConfig);
        support.addRoutes(DefaultTransport.MQTT, Arrays.asList(
                        Route.mqtt("/SNY/DEVICE/#")
                                .upstream(true)//上行!!!!
                                .description("all")
                                .build()
                )
        );
        

HTTP Server 接受不到推送?

org.jetlinks.pro.network.http.device.HttpServerDeviceGateway#doReloadRoute

private void doReloadRoute(List<HttpRoute> routes) {
 
        Map<RouteKey, Disposable> readyToRemove = new HashMap<>(handlers);
        for (HttpRoute route : routes) {
            for (HttpMethod httpMethod : route.getMethod()) {
                String addr = TopicUtils
                    .convertToMqttTopic(route.getAddress())
                    .replace("+", "*")
                    .replace("#", "**");// 替换掉 MQTT形式的匹配符
 
                RouteKey key = RouteKey.of(httpMethod, addr);
                readyToRemove.remove(key);
                //这里应该就是 路由映射.. key 是地址, handleRequest 是处理表达式 (为验证)
                handlers.computeIfAbsent(key, _key -> handleRequest(_key.method, _key.url));
            }
        }
        //取消处理被移除的url信息
        for (Disposable value : readyToRemove.values()) {
            value.dispose();
        }
 
    }

org.jetlinks.pro.network.http.device.HttpServerDeviceGateway#handleRequest

private Disposable handleRequest(HttpMethod method, String url) {
        return httpServer
            .handleRequest(method, url)
            .filterWhen(exchange -> {
                if (!isStarted()) {//判断网关是否 启用
                    return Mono
                        .defer(() -> exchange
                            .error(HttpStatus.SERVICE_UNAVAILABLE)
                            .thenReturn(false))
                        .onErrorReturn(false);
                }
                return Mono.just(true);
            })
            .flatMap(this::handleHttpRequest, Integer.MAX_VALUE)//`org.jetlinks.pro.network.http.device.HttpServerDeviceGateway#handleHttpRequest` 到这里就是编解码器处理了
            .subscribe();
    }

同样是路由问题

support.addAuthenticator(DefaultTransport.HTTP, authenticator);
DefaultConfigMetadata httpMetaConfig = getHttpDefaultMetaConfig();
support.addConfigMetadata(DefaultTransport.HTTP, httpMetaConfig);
support.addRoutes(DefaultTransport.HTTP, Arrays.asList(
                Route.http("/**")//匹配所有
                        .method(HttpMethod.POST)//方法
                        .description("all")
                        .build()
        )
);
// 或者限定路径路由
// support.addRoutes(DefaultTransport.HTTP, Arrays.asList(
//                 Route.http("/SNY/**")
//                 .method(HttpMethod.POST)
//                 .description("/SNY")
//                 .build()
//         )
// );

WebSocket Server 接受不到推送?

看源码大体逻辑跟HTTP Server 差不多 org.jetlinks.pro.network.websocket.server.VertxWebSocketServer#setHttpServers

public void setHttpServers(Collection<HttpServer> httpServers) {
        if (this.httpServers != null && !this.httpServers.isEmpty()) {
            shutdown();
        }
        this.httpServers = httpServers;
        // httpServers 所有HTTP服务 (vertx 的API)
        for (HttpServer httpServer : this.httpServers) {
            httpServer
                .exceptionHandler(err -> {
                    log.error(err.getMessage(), err);
                    netMonitor.error(err);
                })
                .webSocketHandler(webSocket -> {
                    if (log.isDebugEnabled()) {
                        log.debug("handle websocket connection [{}]", webSocket.remoteAddress());
                    }
                    //客户端链接
 
                    String url = webSocket.path();
                    if (url.endsWith("/")) {
                        url = url.substring(0, url.length() - 1);
                    }
				    // <!>这里new的时候, 绑定了消息的处理器; org.jetlinks.pro.network.websocket.server.VertxWebSocketServerClient#doReceived
                    // 所以下面没有没有处理的代码, 纯粹遍历匹配了一下路由, 如果一个都没有匹配打印一下提示
                    VertxWebSocketServerClient serverClient = new VertxWebSocketServerClient(webSocket, netMonitor);
                    // 注意 route 路由, 包含所有协议的 Flux 路由列表
                    // 这里没有 就是协议没有注册到路由, <见下, 还是路由问题>
                    route.findTopic(url)
                         .flatMapIterable(Topic::getSubscribers)
                         .doOnNext(sink -> sink.next(serverClient))
                         .switchIfEmpty(Mono.defer(() -> {
 
                             log.warn("websocket server no handler for:[{}]", serverClient.getPath());
                             return serverClient
                                 .close(WebSocketCloseStatus.BAD_GATEWAY.code())
                                 .then(Mono.empty())
                                 ;
 
                         }))
                         .subscribe();
                });
        }
    }

路由的注册

在网关加载或重载

org.jetlinks.pro.network.websocket.gateway.device.WebSocketServerDeviceGateway#reload

final Mono<Void> reload() {
return protocol
	.flatMap(support -> support
		.getRoutes(DefaultTransport.WebSocket)//拿到协议包的路由信息
		.collectList()
		.doOnEach(ReactiveLogger
					  .onNext(routes -> {
						  //协议包里没有配置路由信息
						  if (CollectionUtils.isEmpty(routes)) {
							  log.warn("The protocol [{}] is not configured with url information", support.getId());
						  }
					  }))
		.doOnNext(this::doReloadRoute))//重载路由
	.then();
}
 
 
 

org.jetlinks.pro.network.websocket.gateway.device.WebSocketServerDeviceGateway#doReloadRoute

private void doReloadRoute(List<Route> routes) {
 
	Map<String, Disposable> readyToRemove = new HashMap<>(handlers);
 
	for (Route route : routes) {
		String address = route.getAddress();
		if (!address.startsWith("/")) {
			address = "/" + address;
		}
		readyToRemove.remove(address);
		handlers.computeIfAbsent(address, this::handleRequest);//这个, 见下
	}
 
	//取消处理被移除的url信息
	for (Disposable value : readyToRemove.values()) {
		value.dispose();
	}
}

org.jetlinks.pro.network.websocket.gateway.device.WebSocketServerDeviceGateway#handleRequest

protected Disposable handleRequest(String path) {
	return webSocketServer
		.handleConnection(path)//这里路由注册
		.filter(this::checkConnectable)
		.flatMap(client -> {
				WebSocketDeviceSession firstSession = new WebSocketDeviceSession( null, client.getRemoteAddress().orElse(null), client, getTransport(), null);
				monitor.connected();
				client.closeHandler(monitor::disconnected);
				return client  
				.receive()  //接受消息的Flux
				.filter(pb -> isStarted()) //过滤 启用状态 
				.flatMap(socketMessage -> protocol  //拿到协议
				.flatMap(protocol -> protocol.getMessageCodec(getTransport()))  //拿到协议对应的 
				.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(firstSession, socketMessage, registry)))  
				.cast(DeviceMessage.class)  
				// 只处理当前网关下的设备消息  
				.filterWhen(deviceMessage -> helper.checkAccessId(getId(), deviceMessage))  
				.flatMap(deviceMessage -> handleDeviceMessage(firstSession, deviceMessage))  
				.onErrorResume((err) -> {  
					log.error("Handle websocket[{}] message failed:{}",  
					firstSession.getAddress(),  
					socketMessage, err);  
					return Mono.empty();  
				}))  
				.doOnCancel(client::close)  
				.onErrorResume((err) -> Mono.empty());  
				}, Integer.MAX_VALUE)  
				.contextWrite(ReactiveLogger.start("network", webSocketServer.getId()))  
				.subscribe();

最终路由注册

org.jetlinks.pro.network.websocket.server.VertxWebSocketServer#handleConnection

@Override
public Flux<WebSocketServerClient> handleConnection(String... urlPatterns) {
	return Flux.create(sink -> {
		Disposable.Composite disposable = Disposables.composite();
		String[] patterns = urlPatterns.length == 0 ? new String[]{"/**"} : urlPatterns;
 
		for (String urlPattern : patterns) {
			String pattern = Stream.of(urlPattern.split("/"))
								   .map(str -> {
									   //处理路径变量,如: /devices/{id}
									   if (str.startsWith("{") && str.endsWith("}")) {
										   return "*";
									   }
									   return str;
								   })
								   .collect(Collectors.joining("/"));
			if (pattern.endsWith("/")) {
				pattern = pattern.substring(0, pattern.length() - 1);
			}
			if (!pattern.startsWith("/")) {
				pattern = "/".concat(pattern);
			}
			log.debug("handle websocket request : {}", pattern);
			Topic<FluxSink<WebSocketServerClient>> sub = route.append(pattern);
			sub.subscribe(sink);
			disposable.add(() -> sub.unsubscribe(sink));
		}
		sink.onDispose(disposable);
	});
}

Open API 登陆 签名验证

用他的Open API 登陆要签名, 麻烦~

/token 接口验证签名的地方 org.jetlinks.pro.openapi.interceptor.OpenApiFilter#checkSign(java.lang.String, java.lang.String, org.jetlinks.pro.openapi.OpenApiClient, reactor.core.publisher.Flux<org.springframework.core.io.buffer.DataBuffer>)

private Mono<Boolean> checkSign(String sign, String timestamp, OpenApiClient client, Map<String, String> multiValueMap) {
 
    return Mono.fromSupplier(() -> {
        MessageDigest digest = client.getSignature().getMessageDigest();//其实是MD5
        byte[] param = new TreeMap<>(multiValueMap)
            .entrySet()
            .stream()
            .filter(e -> e.getKey() != null)
            .map(e -> e.getKey().concat("=").concat(e.getValue() == null ? "" : e.getValue()))
            .collect(Collectors.joining("&"))
            .getBytes();
        digest.update(param);//1. 入参 参数
        digest.update(timestamp.getBytes());//2. 时间戳
        digest.update(client.getSecureKey().getBytes());//3. 密钥
        return sign.equalsIgnoreCase(ByteBufUtil.hexDump(digest.digest()));//与自己计算的校验一下
    });
}

HSWEB (实体的CURD事件)

org.hswebframework.web.crud.events.EntityEventListener 监听实体事件修改实现对象 org.hswebframework.web.crud.events.CompositeEventListener 用于组合 org.hswebframework.web.crud.configuration.EasyormConfiguration::autoRegisterFeature

@Bean
public BeanPostProcessor autoRegisterFeature(RDBDatabaseMetadata metadata) {
    CompositeEventListener eventListener = new CompositeEventListener();//注册了 监听
    metadata.addFeature(eventListener);
    return new BeanPostProcessor() {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            if (bean instanceof EventListener) {
                eventListener.addListener(((EventListener) bean));
            } else if (bean instanceof Feature) {
                metadata.addFeature(((Feature) bean));
            }
            return bean;
        }
    };
}

org.hswebframework.ezorm.rdb.metadata.TableOrViewMetadata::fireEvent

default void fireEvent(EventType eventType, Consumer<EventContext> contextConsumer) {
    EventListener eventListener = this.findFeatureOrElse(EventListener.ID, null);
    if (eventListener != null) {
        EventContext context = EventContext.create();
        context.set(ContextKeys.table, this);
        contextConsumer.accept(context);
        eventListener.onEvent(eventType, context);//调用
    }
}

TODO