从Vertx提供的通信方式开始
第一个集群节点启动
package org.example;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class AppCluster {
public static void main(String[] args) throws UnknownHostException {
final VertxOptions vertxOptions = new VertxOptions();
EventBusOptions eventBusOptions = new EventBusOptions();
String hostAddress = InetAddress.getLocalHost().getHostAddress();
vertxOptions.setEventBusOptions(eventBusOptions).getEventBusOptions().setHost(hostAddress);
HazelcastClusterManager clusterManager = new HazelcastClusterManager();
vertxOptions.setClusterManager(clusterManager);
Vertx.clusteredVertx(vertxOptions, res -> {
Vertx result = res.result();
result.deployVerticle(new MainClusterVerticle(), r -> {
if (r.succeeded()) {
System.out.println(MainClusterVerticle.class.getName() + " --> 部署成功");
} else {
r.cause().printStackTrace();
System.err.println(MainClusterVerticle.class.getName() + " --> 部署失败, " + r.cause().getMessage());
}
});
});
}
}
发布verticle服务
package org.example;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
public class MainClusterVerticle extends AbstractVerticle {
public void start() {
System.out.println("start thread" + Thread.currentThread().getName());
vertx.eventBus().consumer("com.xiaoniu.bus", msg -> {
System.out.println("read thread" + Thread.currentThread().getName());
System.out.println("收到消息");
System.out.println(msg != null ? ((JsonObject) msg.body()).encodePrettily() : "没有消息");
JsonObject j = new JsonObject();
j.put("info", "我是Main");
msg.reply(j);
});
}
}
第二个集群节点启动
package org.example;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import java.io.FileNotFoundException;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class AppCluster2 {
public static void main(String[] args) throws UnknownHostException {
final VertxOptions vertxOptions = new VertxOptions();
EventBusOptions eventBusOptions = new EventBusOptions();
String hostAddress = InetAddress.getLocalHost().getHostAddress();
vertxOptions.setEventBusOptions(eventBusOptions).getEventBusOptions().setHost(hostAddress);
HazelcastClusterManager clusterManager = new HazelcastClusterManager();
vertxOptions.setClusterManager(clusterManager);
Vertx.clusteredVertx(vertxOptions, res -> {
Vertx result = res.result();
result.deployVerticle(new MainClusterVerticle2(), r -> {
if (r.succeeded()) {
System.out.println(MainClusterVerticle2.class.getName() + " --> 部署成功");
} else {
r.cause().printStackTrace();
System.err.println(MainClusterVerticle2.class.getName() + " --> 部署失败, " + r.cause().getMessage());
}
});
});
}
}
发布verticle服务
package org.example;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
public class MainClusterVerticle2 extends AbstractVerticle {
public void start() {
System.out.println("start thread" + Thread.currentThread().getName());
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.route("/index").handler(request -> {
System.out.println("haha" + Thread.currentThread().getName());
System.out.println("时间到了,发送消息");
JsonObject json = new JsonObject().put("info", "我是另主");
vertx.eventBus().request("com.xiaoniu.bus", json, msg -> {
System.out.println("read thread" + Thread.currentThread().getName());
if (msg.succeeded()) {
System.out.println(msg.result() != null
? ((JsonObject)msg.result().body()).encodePrettily()
: "没有信息");
} else {
System.err.println(msg.cause().getMessage());
msg.cause().printStackTrace();
}
});
request.response().end("INDEX SUCCESS");
});
server.requestHandler(router);
server.listen(7777);
}
}
上面提供了两种通信方式,一种是通过HttpServer对外提供http服务,一种式集群间的基于EventBus的通信,后面我将基于这两种通信方式来构造一个小型的微服务框架。
集群启动及原理分析
- 58331端口用于集群中别的vertx服务通过eventbus向它发数据。
- 5700端口,实验中观察发现集群中启动n个服务,每个服务都会与其他n-1个服务建立连接,并且通过心跳保持连接,这样就可以在一个服务挂掉以后,别的服务会有断开连接提示进而知道集群中某节点挂掉。
- 该进程一个重要UDP端口,组播端口,用于互相告知连接端口,如果上面的5700,进而建立连接形成集群。
第一个vertx服务启动界面
第二个vertx服务启动界面
第三个vertx服务启动界面 可以看出每个服务都保存一份集群节点
从第一个进程(4980)的套接字看 可以看出第一个进程通过它的集群端口(5700端口)和其他两个集群中的服务保持了TCP连接从而建立集群
从第二个进程的套接字看(3376) 可以看出第二个进程通过50674端口和第一个进程集群端口(5700端口)保持TCP连接建立集群,通过50787端口连接第三个进程的集群端口(5702端口)保持TCP连接建立集群
看图可能会直观一点
效果展示
定义controller对外提供服务
package com.koala.controller;
import java.util.Date;
import com.koala.common.dto.User;
import com.koala.common.exception.DemoException;
import com.koala.core.annotion.*;
import com.koala.service.DemoService;
@BusController
@BusMapping("demo")
public class DemoController {
@Autowired
private DemoService demoService;
@BusMapping(value = "resource",method = RequestMethod.GET)
public User test1(@RequestParam("user") User msg,@RequestParam("num") long num){
demoService.serviceMethod();
System.out.println(msg.getA());
User user = new User();
user.setA("adf32");
user.setB(2);
user.setDate(new Date());
return user;
}
@BusMapping(value = "resource1",method = RequestMethod.GET)
public User test2(@RequestParam("user") User msg,@RequestParam("num") long num) throws DemoException {
throw new DemoException("您报错了");
}
}
支持Bean配置类:
package com.koala.config;
import com.koala.core.annotion.Value;
import com.koala.core.annotion.VertxBean;
import com.koala.core.annotion.VertxConfiguration;
import com.koala.pojo.A;
import com.koala.pojo.B;
@VertxConfiguration
public class TestConfiguration {
@Value("${my.value}")
private String value;
@VertxBean
public B getB() {
B b = new B();
b.setA(getA());
return b;
}
@VertxBean
public A getA() {
A a = new A();
a.setValue(value);
return a;
}
}
支持IOC及服务启动时执行组件自定义初始化方法
package com.koala.service;
import com.koala.core.annotion.Autowired;
import com.koala.core.annotion.Value;
import com.koala.core.annotion.VertxComponent;
import com.koala.core.ioc.VertxInitializer;
import com.koala.core.ioc.aware.VertxAware;
import com.koala.core.log.LogUtil;
import com.koala.pojo.A;
import com.koala.pojo.B;
import io.vertx.core.Vertx;
@VertxComponent
package com.koala.service;
import com.koala.core.annotion.Autowired;
import com.koala.core.annotion.Value;
import com.koala.core.annotion.VertxComponent;
import com.koala.core.ioc.VertxInitializer;
import com.koala.core.ioc.aware.VertxAware;
import com.koala.core.log.LogUtil;
import com.koala.pojo.A;
import com.koala.pojo.B;
import io.vertx.core.Vertx;
@VertxComponent
public class DemoService implements VertxInitializer, VertxAware {
@Value("${my.url}")
private String url;
@Autowired
private A a;
@Autowired
private B b;
private Vertx vertx;
public void serviceMethod(){
System.out.println(a);
System.out.println(b);
}
@Override
public void init() {
System.out.println(a);
System.out.println(b.getA());
System.out.println(a.getValue());
System.out.println(url);
}
@Override
public void setVertx(Vertx vertx) {
this.vertx = vertx;
}
}
支持接口配置式的远程调用并可注入
package com.koala.client;
import com.koala.common.dto.User;
import com.koala.common.exception.DemoException;
import com.koala.core.annotion.BusMapping;
import com.koala.core.annotion.RequestMethod;
import com.koala.core.annotion.RequestParam;
import com.koala.core.annotion.VertxClient;
@VertxClient
@BusMapping("koala.demo")
public interface MyClient {
@BusMapping(value = "resource", method = RequestMethod.GET)
public User test1(@RequestParam("user") User msg, @RequestParam("num") long num);
@BusMapping(value = "resource1", method = RequestMethod.GET)
public User test2(@RequestParam("user") User msg, @RequestParam("num") long num) throws DemoException;
}
package com.koala.controller;
import com.koala.client.MyClient;
import com.koala.common.dto.User;
import com.koala.common.exception.DemoException;
import com.koala.core.annotion.*;
@BusController
@BusMapping("demo")
public class DemoController {
@Autowired
private MyClient myClient;
@BusMapping(value = "resource",method = RequestMethod.GET)
public User test1(@RequestParam("user") User msg,@RequestParam("num") long num) {
try {
System.out.println(myClient.test2(msg,num));
}catch (DemoException e){
e.printStackTrace();
}
return new User();
}
@BusMapping(value = "resource1",method = RequestMethod.GET)
public User test2(@RequestParam("user") User msg,@RequestParam("num") long num) {
try {
myClient.test2(msg,num);
}catch (DemoException e){
e.printStackTrace();
}
return new User();
}
}
实现概览
源码解析
。。。
适用场景
服务之间传输轻量,单Reactor多线程模型,使用于IO密集型应用。
|