2022-06-27

Java

Vertx

官网 官文 - 英文 官文 - 翻译中文 官方-示例仓库

Eclipse Vert.x™ Reactive applications on the JVM

介绍

Vert.x 是一个开源的、反应式的、事件驱动的工具包,用于在Java虚拟机(JVM)上构建高性能和可扩展的应用程序。它被设计为多用途的,可以用于各种类型的应用程序,包括Web应用程序、微服务和物联网(IoT)系统。Vert.x提供了一组库和API,使开发人员能够创建非阻塞的、异步的应用程序,能够高效地处理大量并发连接。

Vert.x最初于2011年诞生,最初被命名为”node.x”,受到Node.js的启发,但后来由于与Node.js存在混淆的可能性以及法律上的一些问题,项目的名称在2012年被更改为”Vert.x”。这个新名称是从”vertical”(垂直)一词中衍生而来,强调了项目在构建垂直应用程序方面的能力。这个名称变更使Vert.x能够以自己独特的身份发展,不再与Node.js产生混淆。

Vert.x的主要特点包括:

  1. 响应式编程模型: Vert.x围绕着响应式编程原则构建,重点是处理异步事件和数据流。这使得应用程序更加响应迅速,更好地利用系统资源。

  2. 事件循环: Vert.x使用基于单线程事件循环的事件驱动架构。这个事件循环能够高效处理异步I/O操作和事件处理。 利用Netty4的EventLoop来做单线程的事件循环 每个 Vertx 实例维护的是 多个Event Loop 线程。默认情况下,我们会根据机器上可用的核数量来设置 Event Loop 的数量, 我们将这种模式称为 Multi-Reactor 模式

即使一个 Vertx 实例维护了多个 Event Loop,任何一个特定的处理器永远不会被并发执行。大部分情况下(除了 Worker Verticle 以外)它们总是在同一个 Event Loop 线程中被调用。 in short 千万不要阻塞 Vert.x的主事件循环线程!

  1. 多语言支持: 虽然Vert.x主要与Java相关,但它支持多种编程语言,包括JavaScript、Groovy、Ruby和Kotlin。这使得它适用于具有不同语言偏好的团队。 支持7种编程语言,需要糙快猛时可以用脚本语言(如 Ruby/Groovy,可惜暂不支持 Python),需要专注数据处理则可以用函数式语言(如 Scala/Kotlin,后面有望加入 Haskell 和 Clojure),需要兼顾前后端时可以用 Javascript(最近还推出了基于 GraalVM 的 es4x).

  2. 多协议支持: 支持HTTP、WebSocket、Socket、UDP、MQTT、AMQP等多种协议,用于构建高性能、响应式的应用程序。

核心组件介绍

Vert.x 的核心 Java API 被我们称为 Vert.x Core

Vert.x Core 提供了下列功能:

  • 编写 TCP 客户端和服务端
  • 编写支持 WebSocket 的 HTTP 客户端和服务端
  • 事件总线
  • 共享数据 —— 本地的Map和分布式集群Map
  • 周期性、延迟性动作
  • 部署和撤销 Verticle 实例
  • 数据报套接字
  • DNS客户端
  • 文件系统访问
  • 高可用性
  • 集群

Vert.x Core中的功能相当底层 —— 您在此不会找到诸如数据库访问、授权或高层Web应用的功能。您可以在Vert.x ext (扩展包)(译者注:Vert.x的扩展包是Vert.x的子项目集合,类似Web、Web Client、Data Access等)中找到这些功能。

Maven 依赖引用

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-core</artifactId>
  <version>3.x.x</version>
</dependency>

Vert.x 对象

除非您拿到 Vertx 对象,否则在Vert.x领域中您做不了太多的事情。它是 Vert.x 的控制中心,也是您做几乎一切事情的基础,包括创建客户端和服务器、获取事件总线的引用、设置定时器等等。

//创建 Vertx 实例对象
Vertx vertx = Vertx.vertx();
 
//指定配置项
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));
 

VertxOptions 对象有很多配置,包括集群、高可用、池大小等。在Javadoc中描述了所有配置的细节。

大部分应用将只会需要一个Vert.x实例 重量级对象

Don’t call us, we’ll call you

骚俚语, 观察者模式?

Vert.x 的 API 大部分都是事件驱动的。这意味着当您感兴趣的事情发生时,它会以事件的形式发送给您

  • 触发一个计时器
vertx.setPeriodic(1000, id -> {
  // This handler will get called every second  // 这个处理器将会每隔一秒被调用一次
  System.out.println("timer fired!");
});
  • Socket 收到了一些数据
server.requestHandler(request -> {
  // This handler will be called every time an HTTP request is received at the server // 服务器每次收到一个HTTP请求时这个处理器将被调用
  
  request.response().end("hello world!");
});
  • 从磁盘中读取了一些数据
  • 发生了一个异常
  • HTTP 服务器收到了一个请求

Don’t block me!

除了很少的特例(如以 “Sync” 结尾的某些文件系统操作),Vert.x中的所有API都不会阻塞调用线程

如果可以立即提供结果,它将立即返回,否则您需要提供一个处理器(Handler)来接收稍后回调的事件。

因为Vert.x API不会阻塞线程,所以通过Vert.x您可以只使用少量的线程来处理大量的并发。

当使用传统的阻塞式API做以下操作时,调用线程可能会被阻塞:

  • 从 Socket 中读取数据
  • 写数据到磁盘
  • 发送消息给接收者并等待回复
  • 其他很多情况

在上述所有情况下,当您的线程在等待处理结果时它不能做任何事,此时这些线程并无实际用处。这意味着如果您使用阻塞式API处理大量并发,您需要大量线程来防止应用程序逐步停止运转。所需的内存(例如它们的栈)和上下文切换都是线程的开销。这意味着,阻塞式的方式对于现代应用程序所需要的并发级别来说是难于扩展的。

如何运行 block 代码?

可以通过调用 executeBlocking 方法来指定阻塞式代码的执行以及阻塞式代码执行后处理结果的异步回调。

vertx.executeBlocking(future -> {
  // 调用一些需要耗费显著执行时间返回结果的阻塞式API
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());
});

默认的阻塞式代码会在 Vert.x 的 Worker Pool 中执行,通过 setWorkerPoolSize 配置。 若您不需要关心您调用 executeBlocking 的顺序,可以将 ordered 参数的值设为 false。这样任何 executeBlocking 都会在 Worker Pool 中并行执行。

Worker Executor 在不需要的时候必须被关闭: executor.close(); 当使用同一个名字创建了许多 worker 时,它们将共享同一个 pool。当所有的 worker executor 调用了 close 方法被关闭过后,对应的 worker pool 会被销毁。

in short 如果需要运行耗时长的任务, 可以委托给 Worker Executor(类似Netty的工作组)执行

WorkerExecutor 创建

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
  // 调用一些需要耗费显著执行时间返回结果的阻塞式API
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());
});
 

Worker Executor 在不需要的时候必须被关闭: executor.close();

Verticle

Verticle 是什么?

  • verticle相当于1个执行模块,是vertx的部署单元。
  • vertx可以部署多个verticle,且verticle之间可以互相通信。
  • vertx主要是个网络框架,所以在verticle中通常会启动server,如http或tcp。

Verticle 种类

这儿有三种不同类型的 Verticle:

  1. Stardand Verticle:这是最常用的一类 Verticle —— 它们永远运行在 Event Loop 线程上。事件主线程
  2. Worker Verticle:这类 Verticle 会运行在 Worker Pool 中的线程上。一个实例绝对不会被多个线程同时执行
  3. Multi-Threaded Worker Verticle:这类 Verticle 也会运行在 Worker Pool 中的线程上。一个实例可以由多个线程同时执行, 因此需要开发者自己确保线程安全

Verticle 部署

Verticle 的实现类必须实现 Verticle 接口。如果您喜欢的话,可以直接实现该接口,但是通常直接从抽象类 AbstractVerticle 继承更简单。

您可以指定一个 Verticle 名称或传入您已经创建好的 Verticle 实例,使用任意一个 deployVerticle 方法来部署Verticle。

Verticle myVerticle = new MyVerticle();
vertx.deployVerticle(myVerticle);
 
// 允许您部署用任何使用Vert.x支持的语言编写的Verticle实例。**牛鼻!**
// 部署 以全限定类名部署
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");
// 部署JavaScript的Verticle
vertx.deployVerticle("verticles/myverticle.js");
 
// 部署Ruby的Verticle
vertx.deployVerticle("verticles/my_verticle.rb");

如果部署成功,这个完成处理器的结果中将会包含部署ID的字符串。这个部署 ID可以在之后您想要撤销它时使用。

当 Vert.x 部署 Verticle 时,它的 start 方法将被调用,这个方法执行完成后 Verticle 就变成已启动状态。当Vert.x 撤销一个 Verticle 时 stop 会被调用,这个方法执行完成后 Verticle 就变成已停止状态了。

不需要在一个 Verticle 的 stop 方法中手工去撤销启动时部署的子 Verticle,当父 Verticle 在撤销时 Vert.x 会自动撤销任何子 Verticle

配置

向 Verticle 传入配置 可在部署时传给 Verticle 一个 JSON 格式的配置

JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
DeploymentOptions options = new DeploymentOptions().setConfig(config);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

这个配置可以在内部Verticle , 通过 Context 对象或使用 config 方法访问。

实例 MQTT 客户端

Vert.x MQTT client

 
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
 
/**
 * @author yangfh
 * @date 2022/12/14 19:58
 **/
public class IotSubscribeClient {
    private static final Logger log = LoggerFactory.getLogger(IotSubscribeClient.class);
 
    protected MqttClient client = null;
    protected final Handler<MqttPublishMessage> publishHandler;
    protected final IotPlfConfig.MqttConfig mqttConfig;
    protected final Set<String> topics = new HashSet<>();
 
    protected final Vertx vertx = Vertx.vertx();
    public static final int TIME_OUT_OF_SECONDS = 3000;
 
    /**
     * 构造之后未订阅主题, 需 addSubscribe 订阅
     * @author yangfh
     * @time: 2023/1/16 9:34
     */
    public IotSubscribeClient(Handler<MqttPublishMessage> publishHandler, IotPlfConfig.MqttConfig mqttConfig){
        this.publishHandler = publishHandler;
        this.mqttConfig = mqttConfig;
    }
 
    public void init(){
        MqttClientOptions mqttClientOptions = getMqttClientOptions(mqttConfig);
        this.client = MqttClient.create(vertx, mqttClientOptions);
        client.publishHandler(publishHandler);
        client
//                .exceptionHandler( thr->{
//                    log.error("消费 IOT MQTT exceptionHandler Reconnecting..");
//                    client.disconnect();
//                    connect();
//                })
                .closeHandler( (v)->{
                    log.error("消费 IOT MQTT closeHandler {} ", v);
                    client.disconnect();
                    reconnect();//无限重连
                });
        connect();
    }
    private void connect(){
        if(client.isConnected()){
            log.error("连接正常? ");
            return ;
        }
        client.connect(mqttConfig.getPort(), mqttConfig.getHost(), result ->{
            if(result.succeeded()){
                subscribe();
            }else{
                log.error("消费 IOT {},MQTT 连接失败! {}", mqttConfig.getHost(), result);
                reconnect();//无限重连
            }
        });
    }
    private void reconnect(){
        //避免高频重连
        vertx.setTimer(TIME_OUT_OF_SECONDS, timerId -> {
            connect();
        });
    }
 
    private void subscribe() {
        log.info("消费 IOT MQTT 订阅 {}", topics);
        for (Iterator<String> iterator = topics.iterator(); iterator.hasNext(); ) {
            String next =  iterator.next();
            client.subscribe(next, 0);
        }
    }
 
    public static MqttClientOptions getMqttClientOptions(IotPlfConfig.MqttConfig mqttConfig){
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setClientId(mqttConfig.getClientId());
        mqttClientOptions.setUsername(mqttConfig.getUsername());
        mqttClientOptions.setPassword(mqttConfig.getPassword());
        mqttClientOptions.setTcpKeepAlive(true);
        mqttClientOptions.setAutoKeepAlive(true);
        mqttClientOptions.setWillRetain(true);
        mqttClientOptions.setWillQoS(0);
        mqttClientOptions.setCleanSession(true);
        mqttClientOptions.setKeepAliveInterval(30);
        mqttClientOptions.setMaxMessageSize( 1024 * 1024 * 10);//10m
        return mqttClientOptions;
    }
 
    public boolean addSubscribe(String topic){
        return topics.add(topic);
    }
 
 
    public void destroy(){
        if(client != null){
            client.disconnect();
        }
        client= null;
    }
}
 
 

实例 MQTT 服务端

(Vert.x MQTT server)[https://vertx.io/docs/vertx-mqtt/java/#_vert_x_mqtt_server]

 
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {
 
  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
 
  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  System.out.println("[properties = " + endpoint.connectProperties() + "]");
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }
 
  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
 
  // accept connection from the remote client
  endpoint.accept(false);
 
})
  .listen()
  .onComplete(ar -> {
 
    if (ar.succeeded()) {
 
      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {
 
      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });
 

The same endpoint instance provides the disconnectMessageHandler for specifying the handler called when the remote client sends a DISCONNECT message in order to disconnect from the server; this handler takes MqttDisconnectMessage as a parameter.

endpoint.disconnectMessageHandler(disconnectMessage -> {
 
  System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());
});
 

实例 WEB服务

public class MainVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    // Create a Router
    Router router = Router.router(vertx);
 
    // Mount the handler for all incoming requests at every path and HTTP method
    router.route().handler(context -> {
      // Get the address of the request
      String address = context.request().connection().remoteAddress().toString();
      // Get the query parameter "name"
      MultiMap queryParams = context.queryParams();
      String name = queryParams.contains("name") ? queryParams.get("name") : "unknown";
      // Write a json response
      context.json(
        new JsonObject()
          .put("name", name)
          .put("address", address)
          .put("message", "Hello " + name + " connected from " + address)
      );
    });
 
    // Create the HTTP server
    vertx.createHttpServer()
      // Handle every request using the router
      .requestHandler(router)
      // Start listening
      .listen(8888)
      // Print the port
      .onSuccess(server ->
        System.out.println(
          "HTTP server started on port " + server.actualPort()
        )
      );
  }
}