N_JetlinksPro.md
JetlinksPro
N_Jetlinks1_0 源码分析 N_Reactor 响应式框架编程 JetlinksPro 插件开发 JetlinksPro 透传源码分析 官方协议包Git
网络组件
网络组件的重载 流程
org.jetlinks.pro.network.manager.web.NetworkConfigController#start
org.jetlinks.pro.network.manager.service.NetworkConfigService#start
ClusterNetworkManager
org.jetlinks.pro.network.ClusterNetworkManager#reload
org.jetlinks.pro.network.ClusterNetworkManager#doReload
private Mono<Void> doReload(String type, String id) {
return handleConfig(NetworkType.of(type), id, this::createOrUpdate)// 注意 这个`this::createOrUpdate` Lambda , 下面回调的
.then();org.jetlinks.pro.network.ClusterNetworkManager#handleConfig
private Mono<Network> handleConfig(NetworkType type,
String id,
Function3<NetworkProvider<Object>, String, Object, Mono<Network>> handler) {
@SuppressWarnings("all")
NetworkProvider<Object> networkProvider = (NetworkProvider) this
.getProvider(type.getId())
.orElseThrow(() -> new UnsupportedOperationException("不支持的类型:" + type.getName()));
return configManager
.getConfig(type, id)
.filter(NetworkProperties::isEnabled)
.flatMap(networkProvider::createConfig)
.flatMap(config -> handler.apply(networkProvider, id, config));//这里 `handler.apply` 回调的是 `this::createOrUpdate`
}org.jetlinks.pro.network.ClusterNetworkManager#createOrUpdate
public Mono<Network> createOrUpdate(NetworkProvider<Object> provider, String id, Object properties) {
ReactiveCacheContainer<String, Network> networkStore = getNetworkStore(provider.getType());
//这个 provider 视为网络的组件的类型 是 `org.jetlinks.pro.network.NetworkProvider` 的派生类
return networkStore.compute(id, (key, network) -> {
if (network == null) {
//不存在创建这个 网络组件
return provider.createNetwork(properties);
}
//存在重载这个 网络组件
return provider.reload(network, properties);
});
}MqttClientProvider#reload
org.jetlinks.pro.network.mqtt.client.MqttClientProvider#reload
org.jetlinks.pro.network.mqtt.client.MqttClientProvider#initMqttClient
public Mono<Network> initMqttClient(VertxMqttClient mqttClient, MqttClientProperties properties) {//properties MQTT网络组件的配置
return convert(properties)
.map(options -> {
mqttClient.setTopicPrefix(properties.getTopicPrefix());//订阅主题前缀
mqttClient.setLoading(true);
MqttClient client = MqttClient.create(vertx, options);
mqttClient.setClient(client);
client.connect(properties.getRemotePort(), properties.getRemoteHost(), result -> {
mqttClient.setLoading(false);
if (!result.succeeded()) {
log.warn("connect mqtt [{}@{}:{}] error",
properties.getClientId(),
properties.getRemoteHost(),
properties.getRemotePort(),
result.cause());
mqttClient.getNetMonitor().error(result.cause());
} else {
log.debug("connect mqtt [{}] success", properties.getId());
}
});
return mqttClient;
});
}网络组件的事件
org.jetlinks.pro.network.manager.service.NetworkConfigService#start
org.jetlinks.pro.network.ClusterNetworkManager#reload
@Override
public Mono<Void> reload(NetworkType type, String id) {
return doReload(type.getId(), id)
.then(
//异步处理, 发布事件; 谁监听了, 在哪监听了? 见下
eventBus.publish("/_sys/network/" + type.getId() + "/reload", id)
).then();
}
org.jetlinks.pro.network.ClusterNetworkManager#run
@Override
public void run(String... args) {
...
eventBus
.subscribe(Subscription
.builder()
.subscriberId("network-config-manager")
.topics("/_sys/network/*/*")//这里监听 /_sys/network/ 事件
.justBroker()
.build())
.flatMap(payload -> {
Map<String, String> vars = payload.getTopicVars("/_sys/network/{type}/{action}");
String id = payload.bodyToString(true);
log.debug("{} network [{}-{}]", vars.get("action"), vars.get("type"), id);
if ("reload".equals(vars.get("action"))) {//重载
return this
.doReload(vars.get("type"), id)//next
.onErrorResume(err -> {
log.error("reload network error", err);
return Mono.empty();
});
}
if ("shutdown".equals(vars.get("action"))) {//关闭
return doShutdown(vars.get("type"), id);
}
if ("destroy".equals(vars.get("action"))) {//销毁
return doDestroy(vars.get("type"), id);
}
return Mono.empty();
})
org.jetlinks.pro.network.ClusterNetworkManager#doReload
org.jetlinks.pro.network.ClusterNetworkManager#createOrUpdate
public Mono<Network> createOrUpdate(NetworkProvider<Object> provider, String id, Object properties) {
ReactiveCacheContainer<String, Network> networkStore = getNetworkStore(provider.getType());
return networkStore.compute(id, (key, network) -> {
if (network == null) {
return provider.createNetwork(properties);
}
return provider.reload(network, properties);
});
}示网络组件类型
org.jetlinks.pro.network.mqtt.client.MqttClientProvider#reload
规则引擎
关键对象
RuleNode(规则节点): 规则节点描述具体执行的逻辑
RuleLink(规则连线): 用于将多个节点连接起来,将上一个节点的输出结果作为下一个节点的输入结果.
org.jetlinks.rule.engine.api.model.RuleModel(规则模型):由多个RuleNode(规则节点),RuleLink(规则连线)组成 理解为规则的整个模型描述
Input(输入): 规则节点的数据输入
Output(输出): 规则节点的数据输出
Scheduler(调度器): 负责将模型转为任务(Job),并进行任务调度到 Worker
Worker(工作器): 负责执行,维护任务.
org.jetlinks.rule.engine.api.task.ExecutionContext(执行上下文): 启动任务时的上下文,通过上下文获取输入输出配置信息等进行任务处理.
org.jetlinks.rule.engine.api.task.TaskExecutor(任务执行器): 具体执行任务逻辑的实现
org.jetlinks.rule.engine.api.task.TaskExecutorProvider(任务执行器提供器): 用于根据模型配置以及上下文创建任务执行器.
TaskExecutor 分为两类
FunctionTaskExecutor功能性任务, 无状态? 例如: 消息通知,ExecutableTaskExecutor有状态任务, 例如: 定时任务, 延时任务, 场景联动(scene)
RuleData(规则数据): 任务执行过程中的数据实例
规则编排
实例组件数据是存在数据库的 rule_instance 表
前端界面主要基于 Node-RED 改造的;
源码流程
规则引擎启用接口: /rule-engine/instance/1610114117243383808/_start
org.jetlinks.pro.rule.engine.web.RuleInstanceController#start
public Mono<Boolean> start(@PathVariable String id) {
return Authentication
.currentReactive()
.flatMap(auth -> instanceService
.findById(id)
.doOnNext(instance -> {
//主要是校验一下配置是否正确
for (RuleNodeModel node : instance.toRule(modelParser).getNodes()) {
if (!properties.getExecutorFilter().test(auth, node.getExecutor())) {
throw new BusinessException("error.rule_nodes_cannot_be_used" , node.getExecutor() , 500 , node.getExecutor());
}
}
}))
.then(instanceService.start(id))//见下
.thenReturn(true)
;
}构造 RuleModel
org.jetlinks.pro.rule.engine.service.RuleInstanceService#start
org.jetlinks.pro.rule.engine.service.RuleInstanceService#doStart
private Mono<Void> doStart(RuleInstanceEntity entity) {
return Mono
.defer(() -> {
RuleModel model = entity.toRule(modelParser);//解析为 RuleModel 但不包括输入/输出连接线, 其会在内部以属性表示
return ruleEngine
.startRule(entity.getId(), model)//开始运行
.then(createUpdate()//更新数据库 状态
.set(RuleInstanceEntity::getState, RuleInstanceState.started)
.where(entity::getId)
.execute()).then();
});
}主要逻辑
org.jetlinks.rule.engine.cluster.ClusterRuleEngine#startRule
public Flux<Task> startRule(String instanceId, RuleModel model) {
log.debug("starting rule {}\n{}", instanceId, model.toString());
//编译
//<!> 调度任务编译器, 将规则模型的所有节点, 编译成调度任务
//<!> 最终是 一个节点一个 ScheduleJob;
Map</*nodeId*/String, /*Job*/ScheduleJob> jobs = new ScheduleJobCompiler(instanceId, model)
.compile()
.stream()
.collect(Collectors.toMap(ScheduleJob::getNodeId, Function.identity()));
List<Task> startedTask = new ArrayList<>(jobs.size());
//新增调度的任务
Map</*nodeId*/String, /*Job*/ScheduleJob> readyToStart = new ConcurrentHashMap<>(jobs);
//获取调度记录
return repository
.findByInstanceId(instanceId)
.doOnNext(snapshot -> readyToStart.remove(snapshot.getJob().getNodeId()))
.flatMap(snapshot -> {
ScheduleJob job = jobs.get(snapshot.getJob().getNodeId());
ScheduleJob old = snapshot.getJob();
//新的规则减少了任务,则尝试移除旧的任务
if (job == null || !Objects.equals(job.getExecutor(), old.getExecutor())) {
if (job != null && !Objects.equals(job.getExecutor(), old.getExecutor())) {
//移除了旧的,需要重新调度新的
readyToStart.put(job.getNodeId(), job);
log.debug("change job [{}] executor:{} -> {}", snapshot.getJob().getNodeId(),
snapshot.getJob().getExecutor(), job.getExecutor());
} else {
log.debug("shutdown removed job:{}", snapshot.getJob().getNodeId());
}
return this
.shutdown(snapshot)
.then(Mono.empty());
}
return this
.getTaskBySnapshot(snapshot)
.flatMap(task -> {
startedTask.add(task);
//重新加载任务
return task
.setJob(job)
.then(task.reload())
.thenReturn(task);
})
//没有worker调度此任务? 重新调度
.switchIfEmpty(Mono.fromRunnable(() -> readyToStart.put(job.getNodeId(), job)));
})
//<!>doStart 这里这里 见下:
.concatWith(Flux.defer(() -> doStart(readyToStart.values())).doOnNext(startedTask::add))
.collectList()
.map(Flux::fromIterable)
//保存快照信息
.flatMapMany(tasks -> repository
.saveTaskSnapshots(tasks.flatMap(Task::dump))
.thenMany(tasks))
.onErrorResume(err -> {
//失败时停止全部任务
return Flux
.fromIterable(startedTask)
.flatMap(Task::shutdown)
.then(Mono.error(err));
});
}统一启动 Task
org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart
protected Flux<Task> doStart(Collection<ScheduleJob> jobs) {
return Flux
.defer(() -> Flux
.fromIterable(jobs)
//<!> this::scheduleTask; 1. 通过 ScheduleJob 找到 Scheduler(调度器), 2. 通过 Scheduler 将 ScheduleJob 转为 Task
.flatMap(this::scheduleTask)
//将所有Task创建好之后再统一启动
.collectList()
.flatMapIterable(Function.identity())
//<!> 最终统一启动 org.jetlinks.rule.engine.defaults.DefaultTask
.flatMap(task -> task.start().thenReturn(task)));
}
private Flux<Task> scheduleTask(ScheduleJob job) {
//通过 ScheduleJob 找到 Scheduler(调度器)
return selectScheduler(job)
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("no scheduler for " + job.getExecutor())))
// 见下, 其有两个实现 org.jetlinks.rule.engine.defaults.LocalScheduler 和 ClusterRemoteScheduler
.flatMap(scheduler -> scheduler.schedule(job));
}
private Flux<Scheduler> selectScheduler(ScheduleJob job) {
return schedulerRegistry
.getSchedulers()
.filterWhen(scheduler -> scheduler.canSchedule(job))
.as(supports -> schedulerSelector.select(supports, job));
}LocalScheduler 调度器
<!> 停止旧的 Task
org.jetlinks.rule.engine.defaults.LocalScheduler#schedule
public Flux<Task> schedule(ScheduleJob job) {
return Flux
//这里逻辑是 如果有旧的任务 拿到并停止;
.fromIterable(getExecutor(job.getInstanceId(), job.getNodeId()))
.flatMap(task -> {
//停止旧任务
removeTask(task);
return task.shutdown();
})
// <!> 见下
.thenMany(createExecutor(job));
}
//停止旧任务 逻辑
private void removeTask(Task task) {
tasks.remove(task.getId());
//从 executors Map中移出这个任务
getExecutor(task.getJob().getInstanceId(), task.getJob().getNodeId()).remove(task);
}这里是调度器(Scheduler)有个 private final Map<String/*规则实例ID*/, Map<String/*nodeId*/, List<Task>>> executors = new ConcurrentHashMap<>(); Map 存着所有运行着的任务(Task)
Worker 创建 Task
<!> 根据 ScheduleJob 找到 Worker 再用它创建 Task
org.jetlinks.rule.engine.defaults.LocalScheduler#createExecutor
private Flux<Task> createExecutor(ScheduleJob job) {
return findWorker(job.getExecutor(), job)
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor())))
// <!> 见下
.flatMap(worker -> worker.createTask(id, job))
.doOnNext(this::addTask);
}创建上下文(ExecutionContext) 创建任务 (TaskExecutor)
org.jetlinks.rule.engine.defaults.LocalWorker#createTask
org.jetlinks.rule.engine.cluster.worker.ClusterWorker#createTask
@Override
public Mono<Task> createTask(String schedulerId, ScheduleJob job) {
return Mono.justOrEmpty(executors.get(job.getExecutor()))//找到对应的 TaskExecutorProvider
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor())))
.flatMap(provider -> {
//<!> 创建上下文(ExecutionContext)
DefaultExecutionContext context = createContext(job);
//<!> 创建任务 (TaskExecutor)
return provider.createTask(context)
.map(executor -> new DefaultTask(schedulerId, this.getId(), context, executor));
});
}ExecutionContext 有哪些东西?
例 org.jetlinks.rule.engine.cluster.worker.AbstractExecutionContext
通过 ExecutionContext 可以拿到 ScheduleJob(任务信息), 向下游节点输出数据, 告知执行异常 事件等
在哪绑定线程?
绑定线程其实是在 Task 中, 使用 Reactor subscribeOn()给定了调度器
org.jetlinks.rule.engine.defaults.DefaultTask#start
public class DefaultTask implements Task {
@Override
public Mono<Void> start() {
log.debug("start task[{}]:[{}]", getId(), getJob());
return Mono.<Void>fromRunnable(executor::start)//调用 TaskExecutor (执行器)
.doOnSuccess((v) -> startTime = System.currentTimeMillis())
.subscribeOn(Schedulers.boundedElastic());//<!>注意 这里绑了个调度器
}
.....
}如何保证顺序?
关键是 ExecutionContext 的 Output
见: org.jetlinks.rule.engine.defaults.AbstractExecutionContext
见: org.jetlinks.rule.engine.api.task.Output
Output 实例是: org.jetlinks.rule.engine.cluster.worker.QueueOutput
Input 实例是: org.jetlinks.rule.engine.cluster.worker.QueueInput
其实Reactor 天生就适合这种数据流的订阅的业务, 只要构造流时按顺序绑定订阅执行就行;
即 org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart
节点的数据流
向下游输入
Map<String, Object> data = new HashMap<>();
long currentTime = System.currentTimeMillis();
data.put("timestamp", currentTime);
data.put("_now", currentTime);
context
.getOutput()
.write(context.newRuleData(data))获取上游输出
context
.getInput()
.accept()
.filter((data) -> state == Task.State.running)
.flatMap(data -> doSomething...)
简略总结
-
RuleInstanceEntity 将转换 RuleModel
org.jetlinks.pro.rule.engine.service.RuleInstanceService#doStart -
用 ScheduleJobCompiler 编译 ScheduleJob 描述对象
org.jetlinks.rule.engine.cluster.ClusterRuleEngine#startRule -
将 ScheduleJob 转换为 Task
org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart -
统一执行
org.jetlinks.rule.engine.cluster.ClusterRuleEngine#doStart
其他关键的对象
org.jetlinks.rule.engine.api.RuleEngine 是规则统一执行/管理的入口, 它接受RuleModel启动规则
org.jetlinks.rule.engine.api.model.RuleModel 是由多个RuleNode(规则节点),RuleLink(规则连线)组成 理解为整个规则模型描述, 通过解析器(RuleEngineModelParser)解析数据库实体数据.
org.jetlinks.rule.engine.api.model.RuleEngineModelParser 是规则引擎模型解析器,支持将不同的模型格式转换为规则模型(不同类型? 因为前端基于node-red, 所以需要解析为node-red的node结构)
org.jetlinks.rule.engine.api.scheduler.ScheduleJob 一个节点会转为一个ScheduleJob 它作为整个节点任务信息的包装对象, 它持有执行器名称 ,执行器配置, 规则配置, 输入, 输出等信息; Scheduler(调度器) 可使用它找 TaskExecutorProvider 和创建 Task
org.jetlinks.rule.engine.api.task.Task Task也是个包装对象, 主要包装 TaskExecutor(执行器) , 实际上它更接近官文档 Workr 的描述?: 负责执行,维护任务
了解下几类内置节点(任务)
org.jetlinks.rule.engine.api.task.TaskExecutorProvider
周期性执行的节点 TimerTaskExecutor
定时器这种周期性的, 是如何执行的呢?
定时器其实现是: org.jetlinks.pro.rule.engine.executor.TimerTaskExecutorProvider.TimerTaskExecutor
private Disposable execute() {
Duration nextTime = nextDelay.get();
if (this.disposable != null) {
this.disposable.dispose();
}
return this.disposable =
Mono.delay(nextTime, scheduler)//延时执行
.flatMap(t -> {
if (!testSchedule()) {
return Mono.empty();
}
context.getLogger().debug("trigger timed task on {}", metadataManager.currentServerId());
Map<String, Object> data = new HashMap<>();
long currentTime = System.currentTimeMillis();
data.put("timestamp", currentTime);
data.put("_now", currentTime);
return context
.getOutput()
.write(context.newRuleData(data))
.then(context
.fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis()))
.thenReturn(1));
})
.onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
.doFinally(s -> {
if (getState() == Task.State.running && s != SignalType.CANCEL) {
execute();//完成后, 在继续.. 周期性
}
})
.subscribe();
}ReactorQL 节点 ReactorQLTaskExecutor
见:org.jetlinks.pro.rule.engine.executor.ReactorQLTaskExecutorProvider.ReactorQLTaskExecutor
通过 ExecutionContext 可以拿到 ScheduleJob(任务信息), 向下游节点输出数据, 告知执行异常 事件等
...
@Override
protected Disposable doStart() {
Flux<Object> dataStream;
//有上游节点
if (!CollectionUtils.isEmpty(context.getJob().getInputs())) {
dataStream = context
.getInput()
.accept()
.flatMap(ruleData -> reactorQL
.start(Flux.just(RuleDataHelper.toContextMap(ruleData)))
.map(ruleData::newData)
.onErrorResume(err -> {
context.getLogger().error(err.getMessage(), err);
return context.onError(err, null).then(Mono.empty());
}));
} else {
dataStream = reactorQL
.start(table -> {
// 虚拟源表 dual
if (table == null || table.equalsIgnoreCase("dual")) {
return Flux.just(1);
}
if (table.startsWith("/**")) {
throw new UnsupportedOperationException("unsupported topic:" + table);
}
if (table.startsWith("/")) {
boolean clusterSubscriber =
context.getJob().getSchedulingRule() == null
|| SchedulerSelectorStrategy.isAll(context.getJob().getSchedulingRule());
Codec<JSONObject> codec = Codecs.lookup(JSONObject.class);
return this
.refactorTable(table)
.flatMapMany(topics -> {
//从事件总线里订阅
return eventBus
.subscribe(Subscription.of(
"rule-engine:"
.concat(context.getInstanceId())
.concat(":")
.concat(context.getJob().getNodeId()),
topics.toArray(new String[0]),
clusterSubscriber
? new Subscription.Feature[]{Subscription.Feature.local}
: new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker}
),
codec);
});
}
return Flux.just(1);
})
.cast(Object.class);
}
return dataStream
.flatMap(result -> {
RuleData data = context.newRuleData(result);
//输出到下一节点
return context
.getOutput()
.write(Mono.just(data))
.then(context.fireEvent(RuleConstants.Event.result, data));
})
.onErrorResume(err -> context.onError(err, null))
.subscribe();
}
...告警节点 org.jetlinks.pro.rule.engine.alarm.AlarmTaskExecutorProvider
org.jetlinks.pro.rule.engine.alarm.AlarmTaskExecutorProvider.AlarmTaskExecutor
public AlarmTaskExecutor(ExecutionContext context, AlarmRuleHandler handler) {
super("告警", context);
this.handler = handler;
reload();
}
@Override
public String getName() {
return config.getMode() == AlarmMode.relieve
? "解除告警" : "触发告警";
}
@Override
protected Publisher<RuleData> apply(RuleData input) {
return executor
.apply(input)// 调用表达式 lambda; 见 org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler
.doOnError(err -> log.warn("{} alarm error,rule:{}", config.mode, context.getInstanceId(), err))
.map(result -> context.newRuleData(input.newData(result.toMap())));
}
@Override
public void reload() {
config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
ValidatorUtils.tryValidate(config);
// <!> 注意这里, 根据配置来初始化 "解除告警"(triggered) 还是"触发告警"(relieved) lambda
if (config.mode == AlarmMode.relieve) {
executor = input -> handler.relieved(context, input);//<!> 见下, 注意这里时表达式, 即触发时会执行的逻辑
} else {
executor = input -> handler.triggered(context, input);
}
}触发告警 “数据入库”
org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#triggerAlarm
private Mono<AlarmInfo> triggerAlarm(AlarmInfo result) {
AlarmRecordEntity record = ofRecord(result);
//更新告警状态.
return alarmRecordService // 状态更新
.createUpdate()
.set(record)
.where(AlarmRecordEntity::getId, record.getId())
.and(AlarmRecordEntity::getState, AlarmRecordState.warning)
.execute()
//更新数据库报错,依然尝试触发告警!
.onErrorResume(err -> {
log.error("trigger alarm error", err);
return Reactors.ALWAYS_ZERO;
})
.flatMap(total -> {
AlarmHistoryInfo historyInfo = createHistory(record, result);
result.setAlarmTime(record.getAlarmTime());
//更新结果返回0 说明是新产生的告警数据
if (total == 0) {
result.setFirstAlarm(true);
result.setAlarming(false);
return alarmRecordService
.save(record)
.then(historyService.save(historyInfo))
.then(publishAlarmRecord(historyInfo, result))//<!> 注意, 第一次才会发布到事件总线; 见下: 触发告警 "发布事件总线"
.then(publishEvent(historyInfo))//Spring 体系的事件
.then(saveAlarmCache(result, record));
}
result.setFirstAlarm(false);
result.setAlarming(true);
return historyService
.save(historyInfo)
.then(publishEvent(historyInfo))//Spring 体系的事件
.then(saveAlarmCache(result, record));
});
}org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#publishEvent
org.springframework.context.support.AbstractApplicationContext#publishEvent(org.springframework.context.ApplicationEvent)
这个分支到 Spring 体系的事件了, 没啥重点, 盲猜数据入ES库
触发告警 “发布事件总线”
org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#publishAlarmRecord
public Mono<Void> publishAlarmRecord(AlarmHistoryInfo historyInfo, AlarmInfo alarmInfo) {
String topic = Topics.alarm(historyInfo.getTargetType(), historyInfo.getTargetId(), historyInfo.getAlarmConfigId());
// topic = /alarm/device/TCTTST-tct001/1620309816492253184/record
return Flux
.concat(
Mono.just(topic),
ReactiveAuthenticationHolder
.get(alarmInfo.getOwnerId())
.flatMap(AssetsHolder::fromAuth)
.flatMapMany(holder -> holder.refactorTopic(topic))
)
.distinct()
.flatMap(assetTopic -> eventBus.publish(assetTopic, historyInfo))
.then();
}解除告警
org.jetlinks.pro.rule.engine.alarm.DefaultAlarmRuleHandler#relieved
略..
场景联动的节点
设备触发: org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider
定时触发: org.jetlinks.pro.rule.engine.executor.TimerTaskExecutorProvider
见下: [场景联动]
脚本函数节点
脚本节点会转成 LambdaTask 表达式
见: org.jetlinks.pro.rule.engine.executor.ScriptTaskExecutorProvider
见: org.jetlinks.rule.engine.defaults.LambdaTaskExecutor
场景联动
场景联动是规则引擎中,一种业务逻辑的可视化编程方式,您可以通过可视化的方式定义设备之间联动规则。
in short 其实就是规则引擎, 在规则引擎中它是一个 TaskExecutorProvider, 目前有三类触发方式
设备触发: org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider
手动触发,也是: org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider
定时触发: org.jetlinks.pro.rule.engine.executor.TimerTaskExecutorProvider
场景联动数据存储在 rule_scene 表
启用接口
接口: /scene/1616415881994473472/_enable
实例组件数据是存在数据库的 rule_scene 表
org.jetlinks.pro.rule.engine.web.SceneController#enabledScene
org.jetlinks.pro.rule.engine.service.SceneService#enable
只调用 Service 层 更新了数据库;
@Transactional(rollbackFor = Throwable.class)
public Mono<Void> enable(String id) {
Assert.hasText(id, "id can not be empty");
long now = System.currentTimeMillis();
return this
.createUpdate()
.set(SceneEntity::getState, RuleInstanceState.started)
.set(SceneEntity::getModifyTime, now)
.set(SceneEntity::getStartTime, now)
.where(SceneEntity::getId, id)
.execute()
.then();
}这里是它项目框架(hsweb-framework) Service CURD 会触发事件;
最终在事件 hook方法中 流程处理到规则引擎
org.jetlinks.pro.rule.engine.service.SceneService#handleEvent
private Mono<Void> handleEvent(Collection<SceneEntity> entities) {
return Flux
.fromIterable(entities)
.flatMap(scene -> {
//禁用时,停止规则
if (scene.getState() == RuleInstanceState.disable) {
return ruleEngine.shutdown(scene.getId());
}else if (scene.getState() == RuleInstanceState.started){
scene.validate();
//<!> 转换为 RuleModel 调用规则引擎 运行
return ruleEngine.startRule(scene.getId(), scene.toRule().getModel());
}
return Mono.empty();
})
.then();
}场景任务 规则节点
两个对象
org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider
org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider.SceneTaskExecutor
规则入口
org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider.SceneTaskExecutor#init
private Disposable init() {
if (disposable != null) {
disposable.dispose();
}
boolean useBranch = CollectionUtils.isNotEmpty(rule.getBranches());
SqlRequest request = rule.createSql(!useBranch);
Flux<Map<String, Object>> source;
//不是通过SQL来处理数据
if (request.isEmpty()) {
source = context
.getInput()
.accept()
.flatMap(RuleData::dataToMap);
} else {
if (log.isDebugEnabled()) {
log.debug("init scene [{}:{}], sql:{}", rule.getId(), rule.getName(), request.toNativeSql());
}
//<!> 见下: 数据来源 (触发来源)
ReactorQLContext qlContext = createReactorQLContext();
//sql参数
for (Object parameter : request.getParameters()) {
qlContext.bind(parameter);
}
source = ReactorQL
.builder()
.sql(request.getSql())
.build()
.start(qlContext)
.map(ReactorQLRecord::asMap);
}
// 分支条件 // <!> 所有场景联动, 会执行到这个分支?
if (useBranch) {
return rule
.createBranchHandler(
source,
(idx, nodeId, data) -> {
if (log.isDebugEnabled()) {
log.debug("scene [{}] branch [{}] execute", rule.getId(), nodeId);
}
RuleData ruleData = context.newRuleData(data);
return eventBus
.publish("/scene/rule/" + rule.getId(), buildSceneData(data))//事件总线: 发布场景事件
.then(context
.getOutput()//<!> 见: 告警节点 org.jetlinks.pro.rule.engine.alarm.AlarmTaskExecutorProvider; 初始化时就定义好了;
.write(nodeId, ruleData)
.onErrorResume(err -> context.onError(err, ruleData))
.as(tracer()));
});
}
//防抖
Trigger.GroupShakeLimit shakeLimit = rule.getTrigger().getShakeLimit();
if (shakeLimit != null && shakeLimit.isEnabled()) {
ShakeLimitGrouping<Map<String, Object>> grouping = shakeLimit.createGrouping();
source = shakeLimit.transfer(
source,
(time, flux) -> grouping
.group(flux)
.flatMap(group -> group.window(time), Integer.MAX_VALUE),
(map, total) -> map.put("_total", total));
}
return source
.flatMap(this::handleOutput)
.subscribe();
}
数据来源 (触发来源)
org.jetlinks.pro.rule.engine.scene.SceneTaskExecutorProvider.SceneTaskExecutor#createReactorQLContext
private ReactorQLContext createReactorQLContext() {
return ReactorQLContext
.ofDatasource(table -> {
if (table.startsWith("/")) {// <!> 事件总线, 设备消息...
//来自事件总线
return this
.subscribe(table)
//有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题
.as(FluxUtils.distinct(this::getDataId, Duration.ofSeconds(1)));
} else {
//来自上游(定时等) <!> 上游 手动触发, 定时器...
return context
.getInput()
.accept()
.flatMap(RuleData::dataToMap);
}
});
}规则引擎 在事件总线中的消息
事件总线中, 具体事件主题可参考: org.jetlinks.rule.engine.api.RuleConstants.Topics
唉, 文档不全只能自己手动调试.
/rule-engine/1610114117243383808/*/logger/*
/rule-engine/debug
/rule-engine/notification/#
/rule-engine/status/#
/rule-engine/notification/node/#
/rule-engine/event-log/#
# 分支执行事件
/rule-engine/1610114117243383808/*/event/result
/rule-engine/1620993288546447360/branch_1_group_1_action_1/event/result
/rule-engine/1620993288546447360/branch_1_group_1_action_1/event/complete
# 分支错误事件
/rule-engine/1610114117243383808/*/event/errorWebSocket订阅 处理流程
在1.1版本后提供websocket方式订阅平台消息的功能。 可以通过websocket来订阅设备、规则引擎、设备告警等相关消息
TODO 实调试
配置入口
org.jetlinks.pro.gateway.external.socket.WebSocketMessagingHandlerConfiguration
注册Spring HandlerMapping
大概是 WebSocketMessagingHandler 会根据 websocekt 请求, 向 MessagingManager 订阅 topic消息;
返回的 HandlerMapping 是Spring体系的; 处理 /messaging/**的url请求
@Bean
public HandlerMapping webSocketMessagingHandlerMapping(MessagingManager messagingManager,
ObjectProvider<WebSocketAuthenticationHandler> handlers) {
CompositeWebSocketAuthenticationHandler authenticationHandler = new CompositeWebSocketAuthenticationHandler();
handlers.forEach(authenticationHandler::addHandler);
WebSocketMessagingHandler messagingHandler = new WebSocketMessagingHandler(//见下
messagingManager,
authenticationHandler
);
final Map<String, WebSocketHandler> map = new HashMap<>(1);
map.put("/messaging/**", messagingHandler);
final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}解析请求&处理 MessagingRequest
解析请求, 根据不同的类型 分支处理
org.jetlinks.pro.gateway.external.socket.WebSocketMessagingHandler
org.jetlinks.pro.gateway.external.socket.WebSocketMessagingHandler#handle
@Override
@Nonnull
@SuppressWarnings("all")
public Mono<Void> handle(@Nonnull WebSocketSession session) {
//订阅信息
Map<String, Disposable> subs = new ConcurrentHashMap<>();
return authenticationHandler
//从session中获取权限信息
.handle(session)
//没有权限则返回认证错误并关闭连接
.switchIfEmpty(session
.send(Mono.just(session.textMessage(JSON.toJSONString(Message.authError()))))
.then(session.close(CloseStatus.BAD_DATA))
.then(Mono.empty()))
.flatMap(auth -> session
.receive()
.doOnNext(message -> {
try {
......
//解析request请求
MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
if (request == null) {
return;
}
......
if (request.getType() == MessagingRequest.Type.sub) {//订阅
//重复订阅,忽略
Disposable old = subs.get(request.getId());
if (old != null && !old.isDisposed()) {
return;
}
Map<String, String> context = new HashMap<>();
context.put("userId", auth.getUser().getId());
context.put("userName", auth.getUser().getName());
Disposable sub = messagingManager
.subscribe(SubscribeRequest.of(request, auth))//<!>见下, 向 MessagingManager 订阅(附带授权信息)
.doOnEach(ReactiveLogger.onError(err -> log.error("{}", err.getMessage(), err)))
......
} else if (request.getType() == MessagingRequest.Type.unsub) {//取消订阅
//取消订阅则释放订阅
Optional.ofNullable(subs.remove(request.getId()))
.ifPresent(Disposable::dispose);
} else {
sendToSession(session, Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType()));
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
sendToSession(session, Message.error("illegal_argument", null, "消息格式错误"));
}
})
.then())
.doFinally(r -> {
//连接断开时,释放全部订阅
subs.values().forEach(Disposable::dispose);
subs.clear();
});
}in short 就是附带用户信息, 从 MessagingManager 订阅
关于消息管理器 MessagingManager
基于SubscriptionProvider.
根据请求中的 topicSubscribeRequest.getTopic() 使用AntPathMatcher进行匹配SubscriptionProvider.getTopicPattern().
将调用满足条件的 SubscriptionProvider.subscribe(SubscribeRequest) 来获取数据.
in short 中介者模式; 1. 调用其 MessagingManager#subscribe 可以订阅消息, 其消息来源由 SubscriptionProvider 提供; 2. 调用其MessagingManager#register 可以注册 Provider;
org.jetlinks.pro.gateway.external.DefaultMessagingManager
@Override
public Flux<Message> subscribe(SubscribeRequest request) {
return Flux.defer(() -> {
for (SubscriptionProvider provider : subProvider) {
for (String pattern : provider.getTopicPattern()) {
if (matcher.match(pattern, request.getTopic())) {
return provider
.subscribe(request)
.map(v -> {
//兼容之前的API(直接返回Message)
if (v instanceof Message) {
return ((Message) v);
}
return Message.success(request.getId(), request.getTopic(), v);
});
}
}
}
return Flux.error(new UnsupportedOperationException("不支持的topic"));
});
}
public void register(SubscriptionProvider provider) {
subProvider.add(provider);
//重新排序
subProvider.sort(Comparator.comparingInt(SubscriptionProvider::getOrder));
}消息来源
规则引擎 Provider
org.jetlinks.pro.rule.engine.messaging.RuleEngineSubscriptionProvider
public String[] getTopicPattern() {
return new String[]{"/rule-engine/**"};
}
告警消息 Provider
org.jetlinks.pro.device.message.DeviceAlarmSubscriptionProvider
public String[] getTopicPattern() {
return new String[]{"/rule-engine/device/alarm/*/*/*"};
}Pro 版 <消息上行> 处理流程
前面就略了 参考笔记: 消息上报 (MQTT Broker)
org.jetlinks.pro.gateway.DeviceGatewayHelper#handleDeviceMessage(org.jetlinks.core.message.DeviceMessage, java.util.function.Function<org.jetlinks.core.device.DeviceOperator,org.jetlinks.core.server.session.DeviceSession>, java.util.function.Consumer<org.jetlinks.core.server.session.DeviceSession>, java.lang.Runnable)
org.jetlinks.pro.gateway.DeviceGatewayHelper#handleDeviceMessage(org.jetlinks.core.message.DeviceMessage, java.util.function.Function<org.jetlinks.core.device.DeviceOperator,org.jetlinks.core.server.session.DeviceSession>, java.util.function.Consumer<org.jetlinks.core.server.session.DeviceSession>, java.util.function.Supplier<reactor.core.publisher.Mono<org.jetlinks.core.device.DeviceOperator>>)
org.jetlinks.pro.gateway.DeviceGatewayHelper#handleDeviceMessage(org.jetlinks.core.message.DeviceMessage, java.util.function.Function<org.jetlinks.core.device.DeviceOperator,reactor.core.publisher.Mono<org.jetlinks.core.server.session.DeviceSession>>, java.util.function.Function<org.jetlinks.core.server.session.DeviceSession,reactor.core.publisher.Mono<java.lang.Void>>, java.util.function.Supplier<reactor.core.publisher.Mono<org.jetlinks.core.device.DeviceOperator>>)
public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
Function<DeviceSession, Mono<Void>> sessionConsumer,
Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
......
if (doHandle) {
then = handleMessage(null, message) //见下
.then(then);
}
return this
.createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
.flatMap(sessionConsumer)
.then(then)
.contextWrite(Context.of(DeviceMessage.class, message));
}org.jetlinks.pro.gateway.DeviceGatewayHelper#handleMessage
private Mono<Void> handleMessage(DeviceOperator device, Message message) {
return messageHandler////见下 , 实例是 org.jetlinks.pro.device.message.DeviceMessageConnector
.handleMessage(device, message)
//转换为empty,减少触发discard
.flatMap(ignore -> Mono.empty());
}子设备等消息处理
org.jetlinks.pro.device.message.DeviceMessageConnector#handleMessage
public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
Mono<Boolean> then;
//子设备的消息回复
if (message instanceof ChildDeviceMessageReply) {
then = this
//回复给网关设备
.doReply(((ChildDeviceMessageReply) message))
.then(
handleChildrenDeviceMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
);
}
//子设备消息
else if (message instanceof ChildDeviceMessage) {
then = handleChildrenDeviceMessage(((ChildDeviceMessage) message).getChildDeviceMessage());
}
//设备回复消息
else if (message instanceof DeviceMessageReply) {
then = doReply(((DeviceMessageReply) message));
}
//其他消息
else {
//do nothing
then = Reactors.ALWAYS_TRUE;
}
return TraceHolder
.writeContextTo(message, Message::addHeader)
.flatMap(this::onMessage)// <!> 见下
.then(then)
.defaultIfEmpty(false);
}
推送到事件
org.jetlinks.pro.device.message.DeviceMessageConnector#onMessage
public Mono<Void> onMessage(Message message) {
//标记来忽略处理的消息则忽略处理
if (null == message || message.getHeader(Headers.ignore).orElse(false)) {
return Mono.empty();
}
//添加消息ID,下游可以使用此ID进行去重
//场景: 一个消息,可能会被推送到多个topic,比如: 有租户,机构等绑定信息时.
// 如果用户同时在多个租户(A,B)里,设备也同时在这些租户里,那用户可能会收到多次推送(租户A一次,租户B一次)
// 这是可以利用uid进行去重,参照: ReactorUtils.distinct
message.addHeader(PropertyConstants.uid, createUid());
return this
//转化为设备消息 <!> 注意 注意 注意这里, 处理 DeviceMessage 类型消息, 包括了 DirectDeviceMessage
.convertToDeviceMessage(message)
.flatMap(msg -> this
.getTopic(msg)
//推送到事件总线中
.flatMap(topic -> eventBus.publish(topic, msg))
.then())
//忽略错误
.onErrorResume(doOnError);
}处理 DeviceMessage 类型消息
org.jetlinks.pro.device.message.DeviceMessageConnector#convertToDeviceMessage
private Mono<DeviceMessage> convertToDeviceMessage(Message message) {
//只处理设备消息,其他忽略
if (message instanceof DeviceMessage) {
DeviceMessage msg = ((DeviceMessage) message);
return registry
.getDevice(msg.getDeviceId())
//重构消息的header,向header中添加一些常用信息,下游可直接从header中获取使用
// 而不用再从注册中心里获取
.flatMap(device -> refactorHeader(device, msg)
//执行拦截器 <!> 见下 实例是 org.jetlinks.pro.device.message.CompositeDeviceMessagePublishInterceptor
.flatMap(newMsg -> interceptor.beforePublish(device, (DeviceMessage) newMsg)))
.defaultIfEmpty(msg);
}
return Mono.empty();
}消息拦截器, 计算属性…
org.jetlinks.pro.device.message.CompositeDeviceMessagePublishInterceptor
目前它有三个处理监听器
org.jetlinks.pro.device.service.MetadataMappingDeviceMessagePublishInterceptor//物模型映射拦截器,在将设备消息推送到事件总线前
org.jetlinks.pro.device.message.streaming.DeviceVirtualPropertyStreaming //设备虚拟属性实时计算处理类
org.jetlinks.pro.device.message.MergeLatestPropertyInterceptor//合并最新的数据到消息中
踩坑指南
前端菜单不显示?
2.0 前端菜单渲染有问题, 貌似是自定义渲染的问题
src\app.tsx
注掉 menuItemRender和 menuRender属性
menuDataRender: () => {
return getMenus(extraRoutes);
},
// subMenuItemRender: MenuItemIcon,
// menuItemRender: (menuItemProps, defaultDom) => {
// return (menuItemProps, defaultDom, {
// onClick: () => {
// history.push(menuItemProps.path!);
// },
// });
// },
// menuRender:(props,defaultDom)=>{
// console.log(defaultDom,'11111111111111')
// return true
// },
// headerRender:false,
MQTT Broker 不订阅主题?
版本2.0 改为网络组件中定义配置订阅主题; 且增加了’路由’概念
但MQTT还是在网关组件中调用 网络组件 的 subscribe 定义主题, 逻辑还是网关组件中..
org.jetlinks.pro.network.mqtt.client.VertxMqttClient#subscribe
重载网关入口
略过前面..
org.jetlinks.pro.network.mqtt.gateway.device.MqttClientDeviceGateway#reload
protected Mono<Void> reload() {
return this
.getProtocol()
.flatMap(support -> support
.getRoutes(DefaultTransport.MQTT)
.filter(MqttRoute.class::isInstance)
.cast(MqttRoute.class)
.collectList()
.doOnEach(ReactiveLogger
.onNext(routes -> {
//协议包里没有配置Mqtt Topic信息
if (CollectionUtils.isEmpty(routes)) {
log.warn("The protocol [{}] is not configured with topics information", support.getId());
}
}))
.doOnNext(this::doReloadRoute))//处理路由
.then();
}路由有区分上行 还是下行
protected void doReloadRoute(List<MqttRoute> routes) {
Map<RouteKey, Tuple2<Integer, Disposable>> readyToRemove = new HashMap<>(this.routes);
for (MqttRoute route : routes) {
//不是上行topic,不订阅 [路由还分上下行!]
if (!route.isUpstream()) {
continue;
}
String topic = convertToMqttTopic(route.getTopic());
RouteKey key = RouteKey.of(topic, route.getQos());
readyToRemove.remove(key);
//尝试更新订阅
this.routes.compute(key, (_key, old) -> {
if (old != null) {
//QoS没变,不用重新订阅
if (old.getT1().equals(_key.qos)) {
return old;
} else {
old.getT2().dispose();
}
}
return Tuples.of(_key.qos, doSubscribe(_key.topic, _key.qos));
});
}
//取消订阅协议包里没有的topic信息
for (Map.Entry<RouteKey, Tuple2<Integer, Disposable>> value : readyToRemove.entrySet()) {
this.routes.remove(value.getKey());
value.getValue().getT2().dispose();
}
}原因: 是在协议里面配置路由 有个参数是否上行!!!
support.addAuthenticator(DefaultTransport.MQTT, authenticator);
DefaultConfigMetadata mqttMetaConfig = getMqttDefaultMetaConfig();
support.addConfigMetadata(DefaultTransport.MQTT, mqttMetaConfig);
support.addRoutes(DefaultTransport.MQTT, Arrays.asList(
Route.mqtt("/SNY/DEVICE/#")
.upstream(true)//上行!!!!
.description("all")
.build()
)
);
HTTP Server 接受不到推送?
org.jetlinks.pro.network.http.device.HttpServerDeviceGateway#doReloadRoute
private void doReloadRoute(List<HttpRoute> routes) {
Map<RouteKey, Disposable> readyToRemove = new HashMap<>(handlers);
for (HttpRoute route : routes) {
for (HttpMethod httpMethod : route.getMethod()) {
String addr = TopicUtils
.convertToMqttTopic(route.getAddress())
.replace("+", "*")
.replace("#", "**");// 替换掉 MQTT形式的匹配符
RouteKey key = RouteKey.of(httpMethod, addr);
readyToRemove.remove(key);
//这里应该就是 路由映射.. key 是地址, handleRequest 是处理表达式 (为验证)
handlers.computeIfAbsent(key, _key -> handleRequest(_key.method, _key.url));
}
}
//取消处理被移除的url信息
for (Disposable value : readyToRemove.values()) {
value.dispose();
}
}org.jetlinks.pro.network.http.device.HttpServerDeviceGateway#handleRequest
private Disposable handleRequest(HttpMethod method, String url) {
return httpServer
.handleRequest(method, url)
.filterWhen(exchange -> {
if (!isStarted()) {//判断网关是否 启用
return Mono
.defer(() -> exchange
.error(HttpStatus.SERVICE_UNAVAILABLE)
.thenReturn(false))
.onErrorReturn(false);
}
return Mono.just(true);
})
.flatMap(this::handleHttpRequest, Integer.MAX_VALUE)//`org.jetlinks.pro.network.http.device.HttpServerDeviceGateway#handleHttpRequest` 到这里就是编解码器处理了
.subscribe();
}同样是路由问题
support.addAuthenticator(DefaultTransport.HTTP, authenticator);
DefaultConfigMetadata httpMetaConfig = getHttpDefaultMetaConfig();
support.addConfigMetadata(DefaultTransport.HTTP, httpMetaConfig);
support.addRoutes(DefaultTransport.HTTP, Arrays.asList(
Route.http("/**")//匹配所有
.method(HttpMethod.POST)//方法
.description("all")
.build()
)
);
// 或者限定路径路由
// support.addRoutes(DefaultTransport.HTTP, Arrays.asList(
// Route.http("/SNY/**")
// .method(HttpMethod.POST)
// .description("/SNY")
// .build()
// )
// );WebSocket Server 接受不到推送?
看源码大体逻辑跟HTTP Server 差不多
org.jetlinks.pro.network.websocket.server.VertxWebSocketServer#setHttpServers
public void setHttpServers(Collection<HttpServer> httpServers) {
if (this.httpServers != null && !this.httpServers.isEmpty()) {
shutdown();
}
this.httpServers = httpServers;
// httpServers 所有HTTP服务 (vertx 的API)
for (HttpServer httpServer : this.httpServers) {
httpServer
.exceptionHandler(err -> {
log.error(err.getMessage(), err);
netMonitor.error(err);
})
.webSocketHandler(webSocket -> {
if (log.isDebugEnabled()) {
log.debug("handle websocket connection [{}]", webSocket.remoteAddress());
}
//客户端链接
String url = webSocket.path();
if (url.endsWith("/")) {
url = url.substring(0, url.length() - 1);
}
// <!>这里new的时候, 绑定了消息的处理器; org.jetlinks.pro.network.websocket.server.VertxWebSocketServerClient#doReceived
// 所以下面没有没有处理的代码, 纯粹遍历匹配了一下路由, 如果一个都没有匹配打印一下提示
VertxWebSocketServerClient serverClient = new VertxWebSocketServerClient(webSocket, netMonitor);
// 注意 route 路由, 包含所有协议的 Flux 路由列表
// 这里没有 就是协议没有注册到路由, <见下, 还是路由问题>
route.findTopic(url)
.flatMapIterable(Topic::getSubscribers)
.doOnNext(sink -> sink.next(serverClient))
.switchIfEmpty(Mono.defer(() -> {
log.warn("websocket server no handler for:[{}]", serverClient.getPath());
return serverClient
.close(WebSocketCloseStatus.BAD_GATEWAY.code())
.then(Mono.empty())
;
}))
.subscribe();
});
}
}路由的注册
在网关加载或重载
org.jetlinks.pro.network.websocket.gateway.device.WebSocketServerDeviceGateway#reload
final Mono<Void> reload() {
return protocol
.flatMap(support -> support
.getRoutes(DefaultTransport.WebSocket)//拿到协议包的路由信息
.collectList()
.doOnEach(ReactiveLogger
.onNext(routes -> {
//协议包里没有配置路由信息
if (CollectionUtils.isEmpty(routes)) {
log.warn("The protocol [{}] is not configured with url information", support.getId());
}
}))
.doOnNext(this::doReloadRoute))//重载路由
.then();
}
org.jetlinks.pro.network.websocket.gateway.device.WebSocketServerDeviceGateway#doReloadRoute
private void doReloadRoute(List<Route> routes) {
Map<String, Disposable> readyToRemove = new HashMap<>(handlers);
for (Route route : routes) {
String address = route.getAddress();
if (!address.startsWith("/")) {
address = "/" + address;
}
readyToRemove.remove(address);
handlers.computeIfAbsent(address, this::handleRequest);//这个, 见下
}
//取消处理被移除的url信息
for (Disposable value : readyToRemove.values()) {
value.dispose();
}
}org.jetlinks.pro.network.websocket.gateway.device.WebSocketServerDeviceGateway#handleRequest
protected Disposable handleRequest(String path) {
return webSocketServer
.handleConnection(path)//这里路由注册
.filter(this::checkConnectable)
.flatMap(client -> {
WebSocketDeviceSession firstSession = new WebSocketDeviceSession( null, client.getRemoteAddress().orElse(null), client, getTransport(), null);
monitor.connected();
client.closeHandler(monitor::disconnected);
return client
.receive() //接受消息的Flux
.filter(pb -> isStarted()) //过滤 启用状态
.flatMap(socketMessage -> protocol //拿到协议
.flatMap(protocol -> protocol.getMessageCodec(getTransport())) //拿到协议对应的
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(firstSession, socketMessage, registry)))
.cast(DeviceMessage.class)
// 只处理当前网关下的设备消息
.filterWhen(deviceMessage -> helper.checkAccessId(getId(), deviceMessage))
.flatMap(deviceMessage -> handleDeviceMessage(firstSession, deviceMessage))
.onErrorResume((err) -> {
log.error("Handle websocket[{}] message failed:{}",
firstSession.getAddress(),
socketMessage, err);
return Mono.empty();
}))
.doOnCancel(client::close)
.onErrorResume((err) -> Mono.empty());
}, Integer.MAX_VALUE)
.contextWrite(ReactiveLogger.start("network", webSocketServer.getId()))
.subscribe();最终路由注册
在 org.jetlinks.pro.network.websocket.server.VertxWebSocketServer#handleConnection
@Override
public Flux<WebSocketServerClient> handleConnection(String... urlPatterns) {
return Flux.create(sink -> {
Disposable.Composite disposable = Disposables.composite();
String[] patterns = urlPatterns.length == 0 ? new String[]{"/**"} : urlPatterns;
for (String urlPattern : patterns) {
String pattern = Stream.of(urlPattern.split("/"))
.map(str -> {
//处理路径变量,如: /devices/{id}
if (str.startsWith("{") && str.endsWith("}")) {
return "*";
}
return str;
})
.collect(Collectors.joining("/"));
if (pattern.endsWith("/")) {
pattern = pattern.substring(0, pattern.length() - 1);
}
if (!pattern.startsWith("/")) {
pattern = "/".concat(pattern);
}
log.debug("handle websocket request : {}", pattern);
Topic<FluxSink<WebSocketServerClient>> sub = route.append(pattern);
sub.subscribe(sink);
disposable.add(() -> sub.unsubscribe(sink));
}
sink.onDispose(disposable);
});
}Open API 登陆 签名验证
用他的Open API 登陆要签名, 麻烦~
/token 接口验证签名的地方
org.jetlinks.pro.openapi.interceptor.OpenApiFilter#checkSign(java.lang.String, java.lang.String, org.jetlinks.pro.openapi.OpenApiClient, reactor.core.publisher.Flux<org.springframework.core.io.buffer.DataBuffer>)
private Mono<Boolean> checkSign(String sign, String timestamp, OpenApiClient client, Map<String, String> multiValueMap) {
return Mono.fromSupplier(() -> {
MessageDigest digest = client.getSignature().getMessageDigest();//其实是MD5
byte[] param = new TreeMap<>(multiValueMap)
.entrySet()
.stream()
.filter(e -> e.getKey() != null)
.map(e -> e.getKey().concat("=").concat(e.getValue() == null ? "" : e.getValue()))
.collect(Collectors.joining("&"))
.getBytes();
digest.update(param);//1. 入参 参数
digest.update(timestamp.getBytes());//2. 时间戳
digest.update(client.getSecureKey().getBytes());//3. 密钥
return sign.equalsIgnoreCase(ByteBufUtil.hexDump(digest.digest()));//与自己计算的校验一下
});
}HSWEB (实体的CURD事件)
org.hswebframework.web.crud.events.EntityEventListener 监听实体事件修改实现对象
org.hswebframework.web.crud.events.CompositeEventListener 用于组合
org.hswebframework.web.crud.configuration.EasyormConfiguration::autoRegisterFeature
@Bean
public BeanPostProcessor autoRegisterFeature(RDBDatabaseMetadata metadata) {
CompositeEventListener eventListener = new CompositeEventListener();//注册了 监听
metadata.addFeature(eventListener);
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof EventListener) {
eventListener.addListener(((EventListener) bean));
} else if (bean instanceof Feature) {
metadata.addFeature(((Feature) bean));
}
return bean;
}
};
}org.hswebframework.ezorm.rdb.metadata.TableOrViewMetadata::fireEvent
default void fireEvent(EventType eventType, Consumer<EventContext> contextConsumer) {
EventListener eventListener = this.findFeatureOrElse(EventListener.ID, null);
if (eventListener != null) {
EventContext context = EventContext.create();
context.set(ContextKeys.table, this);
contextConsumer.accept(context);
eventListener.onEvent(eventType, context);//调用
}
}TODO