最近的一个项目中,来来回回折腾了几次技术方案,其中的改为Thrift的过程稍微记录一下。
本来是个业务过程很简单的项目,分为两个网络大区的两个服务端,其中的一个大区是传统的B/S架构,基于springboot开发,浏览器访问,没什么问题。另一个大区是C/S架构,我们一开始把服务端用SpringBoot实现了,用的是Springboot+JPA+MySQL,提供的是HTTP REST接口,客户端用的是CentOS+QT+libCurl实现,用libCurl对接服务端的REST接口。两个网络大区间是隔离的,通过正反向隔离装置可以进行文件的收发,因此两个服务端都有定时任务去扫描发送过来的文件。
问题出在CS的服务端,要去部署时,甲方大爷说了HTTP/HTTPS不符合网安的要求,不能用。没办法只能重写,但是又想尽量把之前的东西复用上。查了一下初步确定gRPC和Thrift两个方案,pRPC是HTTP2的,怕还是不行,最终选择了Thrift方案。
比较了一下gPRC和Thrift,其实定位还是有差别的。从最基础的TCP往上,Netty是把基础的TCP做了封装,而Thrift的最基础的网络通信部分和netty有点类似,在上面又加上了序列化和多语言支持的部分,而gRPC是工作在HTTP2协议之上的,主要是序列化和多语言支持的部分。
改造时想把数据库访问、网络大区消息同步这些重用起来,因此还是在原来的Springboot工程之上进行的修改。
首先进行Thrift接口的定义
namespace java com.sifang.ngauto.thrift
struct CommonResult {
1:bool suc,
2:string message
}
struct LoginResult {
1:CommonResult result,
2:i32 uid,
3:i32 rid,
4:string uname,
5:string nameCh,
6:i32 pid,
7:string pname,
8:string token
}
struct StationInfo {
1:i32 id,
2:string name,
4:string comment
}
//此处省略三千字……
service OpsService {
CommonResult deviceRegister(1:string devid, 2:string sign, 3:string cert),
LoginResult userLogin(1:string username, 2:string password, 3:string devid),
//此处省略三千字……
}
再用thrift工具生成java和C++代码,C++代码留给改造QT客户端用,
thrift-0.16.0.exe -r -gen cpp maintance_service.thrift
thrift-0.16.0.exe -r -gen java maintance_service.thrift
然后来改造Springboot工程,首先将生成的java代码放到工程里面。
新建java类,继承自生成的类,实现业务代码,主要是把之前Service层实现的代码拷贝过来,根据接口的传出类型修改,之前的Repository层的类都可以Autowired进来。
@Service
public class OpsServiceImpl implements OpsService.Iface{
@Autowired
InfoSyncUtil infoSyncUtil;
@Autowired
private UserRepository userRepository;
@Override
public LoginResult userLogin(java.lang.String username, java.lang.String password, java.lang.String devid) throws org.apache.thrift.TException {
//……业务代码,把之前的Service层的代码考过来
}
@Override
public CommonResult deviceRegister(java.lang.String devid, java.lang.String sign, java.lang.String cert) throws org.apache.thrift.TException {
}
再把之前的Controller层、Service层的代码都去掉,保留数据库访问Repository和网络消息同步的代码。
然后pom文件中去掉web、swagger等,加上thrift。
<!--<dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.16.0</version>
</dependency>
application.yml中去掉web的端口server.port,加上thrift的端口
#server:
# port: 8080
thrift:
port: 8090
?添加Thrift Server,用PostConstruct启动
@Configuration
public class ThriftServer {
@Value("${thrift.port}")
private Integer port;
@Autowired
private OpsServiceImpl opsService;
@PostConstruct
public void start() {
TProcessor tprocessor = new OpsService.Processor<OpsService.Iface>(opsService);
// 传输通道 - 非阻塞方式
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
//多线程半同步半异步
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.transportFactory(new TFramedTransport.Factory());
//二进制协议
tArgs.protocolFactory(new TBinaryProtocol.Factory());
int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors());
int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
tArgs.workerThreads(numThreads);
tArgs.selectorThreads(selectorThreads);
// 多线程半同步半异步的服务模型
TServer server = new TThreadedSelectorServer(tArgs);
log.info("Thrift TThreadedSelectorServer start at port: {}, workerThreads: {}, selectorThreads: {} ......", port, numThreads, selectorThreads);
server.serve(); // 启动服务
} catch (TTransportException e) {
log.error("Thrift TThreadedSelectorServer start fail : {}", e.getMessage());
}
}
}
服务端改造完成,写个java客户端测试一下,把生成的java代码包含进来,然后写个Client类。
public class TestClient {
public static void main(String[] args) {
try {
TFramedTransport transport =
new TFramedTransport(new TSocket("localhost", 8090,2000));
//TSocket m_transport = new TSocket("127.0.0.1", 8080,2000);//创建一个传输层对象(TTransport),具体采用的传输方式是TFramedTransport,要与服务器端保持一致,
TBinaryProtocol protocol = new TBinaryProtocol(transport);//创建一个通信协议对象(TProtocol),具体采用的通信协议是二进制协议
OpsService.Client testClient = new OpsService.Client(protocol);//创建一个Thrift客户端对象(TestThriftService.Client),Thrift的客户端类TestThriftService.Client已经在文件TestThriftService.java中,由Thrift编译器自动为我们生成
System.out.println("连接服务器");
transport.open();//打开socket,建立与服务器直接的socket连接
String token = null;
System.out.println("1、测试设备注册");
CommonResult res = testClient.deviceRegister("123456789", "", "");
if(res.isSuc()) {
System.out.println("--设备注册成功");
}
else {
System.out.println("--设备注册失败: " + res.getMessage());
}
System.out.println("2、用户登录");
LoginResult res1 = testClient.userLogin("nfdwzd", "fa376b17f37342c3e61d1e06a86c29306fe12bb743b2b1c677dc9ec157242dd4", "123456789");
if(res1.getResult().isSuc()) {
token = res1.getToken();
System.out.println("--用户登录成功,用户id:"+res1.getUid()+",用户单位:"+res1.getPname());
}
else {
System.out.println("--登录失败: " + res1.getResult().getMessage());
}
// 此处省略三千字……
transport.close();//使用完成关闭socket
}
catch (TException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
测试居然失败了!!!
跟踪进服务端,发现进了相应的服务端函数,但是Repository的数据库调用的时候就停止了。然后就是各种调试,分析,包冲突?版本?注入有问题?几个小时没结果,又把JPA换成Mybatis测试一下,这下可以了,但是原先的@Scheduled定时任务又没有执行,突然想到了ThriftServer开始了之后会一直accept导致线程阻塞,把Server修改成一个单独的线程执行。
@Configuration
public class ThriftServer {
@Value("${thrift.port}")
private Integer port;
@Autowired
private OpsServiceImpl opsService;
@PostConstruct
public void start() {
new Thread() {
@Override
public void run() {
try {
TProcessor tprocessor = new OpsService.Processor<OpsService.Iface>(opsService);
// 传输通道 - 非阻塞方式
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
//多线程半同步半异步
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.transportFactory(new TFramedTransport.Factory());
//二进制协议
tArgs.protocolFactory(new TBinaryProtocol.Factory());
int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors());
int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
tArgs.workerThreads(numThreads);
tArgs.selectorThreads(selectorThreads);
// 多线程半同步半异步的服务模型
TServer server = new TThreadedSelectorServer(tArgs);
log.info("Thrift TThreadedSelectorServer start at port: {}, workerThreads: {}, selectorThreads: {} ......", port, numThreads, selectorThreads);
server.serve(); // 启动服务
} catch (TTransportException e) {
log.error("Thrift TThreadedSelectorServer start fail : {}", e.getMessage());
}
}
}.start();
}
}
测试通过!!!
还遗留了一些问题,单线程的时候JPA调用被阻塞了,Mybatis却没有,这个有空研究一下。
|