一、WebSocket概述
????????WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
? ? ? ? WebSocket使得客户端和服务器之间的数据缓缓变得更加简单,允许服务端主动向客户端推送数据。
? ? ? ? 在WebSocketAPI中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
? ? ? ? 浏览器通过JavaScript向服务器发出建立WebSocket连接的请求,连接建立以后,客户端和服务端就可以改通过TCP连接直接交换数据。
? ? ? ? 相对于HTTP这种非持久性的协议来说,WebSocket是一个持久化的协议。
二、WebSocket 工程的创建
选择Spring Initializr工程,点击下一步
?填写工程的组名,并选择所使用的java版本号
?选择Spring Web 和WebSocket工具
?
?
?填写工程名及所在路径
?
?这时,pom.xml文件中自动打入了如下两个依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
三、WebSocket配置
WebSocket配置可以使用@Configuration注解来代替xml文件配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
//@Configuration注解等同于xml文件
@Configuration
public class WebSocketConfig {
//这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
//endpoint 端口的意思
@Bean
public ServerEndpointExporter getServerEndPointExporter(){
return new ServerEndpointExporter();
}
}
四、核心代码
import cn.bigdata.shop12.websoc.websocserver.config.WebSocketConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.*;
//@ConditionalOnClass表示当有条件中的类时,自动创建组件,配置类里面内容才会生效
@ConditionalOnClass(value= WebSocketConfig.class)
//@Component 通用,当不明确属于哪一层时使用
//@Controller 控制层注解
//@Service 服务层注解
//@Repository 数据访问层注解
@Component
//当满足条件时,创建WebSocket端点
@ServerEndpoint(value="/shop12_ws/user_behavior_event")
public class ShopWebSocket {
//类型属性: 所有ShopWebSocket对象共享
private static ConcurrentMap<String,ShopWebSocket> clients = null;
//类型属性: 负责向kafka中按条写入日志
private static KafkaProducer<Integer,String> kafka = null;
//输入kafka主题
private static final String INPUT_TOPIC = "input_user_behavior_event_topic";
//控制kafka消息写入的分区号
private static int partition = 0;
//kafka最大分区数
private static final int MAX_PARTITION = 3;
//失败次数
private static final int MAX_RETRY_TIMES = 3;
//HDFS
private static SimpleDateFormat dateFormat = null;
private static final String LOCAL_DIR = "C:\\Users\\Administrator\\Desktop\\local_dir";
private static final int LOCAL_BUFFER = 1024; //1024*1024
private static final int FLUSH_BUFFER = LOCAL_BUFFER;
private static PrintWriter localWriter = null;
private static FileSystem hdfsWriter = null;
//定时线程调度执行hdfs写入
private static ScheduledExecutorService scheduler = null;
private static File localFile = null;
private static final String HDFS_ROOT = "hdfs://192.168.131.200:9820";
private static final String HDFS_DIR = "/shop12/user_act_log";
//静态代码块初始化: 自动调用
static{
clients = new ConcurrentHashMap<>();
Properties kafkaConfig = new Properties();
kafkaConfig.setProperty("bootstrap.servers","192.168.131.200:9092");
kafkaConfig.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
kafkaConfig.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaConfig.setProperty("retries","3");
kafkaConfig.setProperty("acks","-1");
kafkaConfig.setProperty("batch.size","8192");
kafkaConfig.setProperty("linger.ms","30000");
kafka = new KafkaProducer<Integer, String>(kafkaConfig);
dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
scheduler = Executors.newScheduledThreadPool(2);
Configuration hdfsConfig = new Configuration();
hdfsConfig.set("fs.defaultFS",HDFS_ROOT);
initHadoopFileSystem(hdfsConfig,0);
scheduler.scheduleAtFixedRate(()->{
writeToHdfs();
},1,1, TimeUnit.MINUTES);
}
private static void writeToHdfs(){
//将local_DIR目录下非空,不以.COMPLETED结尾的文件过滤出来
File[] files = new File(LOCAL_DIR).listFiles(
file->file.isFile() &&
!file.getName().endsWith(".COMPLETED") &&
file.length()>FLUSH_BUFFER);
if (files.length>0){
for (File file : files) {
String[] ns = file.getName().split("_");
String dir = ns[0];
String name = ns[1];
try {
hdfsWriter.copyFromLocalFile(
new Path(file.getAbsolutePath()),
new Path(HDFS_DIR+"/"+dir+"/"+name));
file.renameTo(new File(file.getAbsolutePath()+".COMPLETED"));
System.out.println("succeed to copy "+file.getName()+" to hdfs");
} catch (IOException e) {
System.err.println("fail to copy "+file.getName()+" to hdfs : "+e.getMessage());
}
}
}
}
private static void initHadoopFileSystem(Configuration hdfsConfig,int retryCount){
try {
hdfsWriter = FileSystem.get(hdfsConfig);
return;
} catch (IOException e){
if(++retryCount<MAX_RETRY_TIMES){
//递归
initHadoopFileSystem(hdfsConfig,retryCount);
}else{
//没有启动hadoop服务,安全模式
System.err.println("fail to initialize hadoop file system for 3 times");
System.exit(-1);
}
}
}
private static boolean needInitLocalWriter(){
if (null==localWriter){
return true;
}
localWriter.flush();
if (localFile.length()>=FLUSH_BUFFER){
return true;
}
return false;
}
//static synchronized 的锁对象为shopWebSocket.class
private static synchronized boolean initLocalWriter(){
if (needInitLocalWriter()){
if(null!=localWriter){
localWriter.close();
}
for (int i = 0;i<MAX_RETRY_TIMES;){
try{
localFile = new File(LOCAL_DIR+"/"+dateFormat.format(new Date()));
if (localFile.exists() && localFile.length()>=FLUSH_BUFFER){
Thread.sleep(1000);
continue;
}
localWriter = new PrintWriter(
new BufferedWriter(
new FileWriter(localFile,true),LOCAL_BUFFER));
return true;
}catch (Exception e){
i++;
continue;
}
}
System.err.println("fail to initialize local file writer for 3 times");
return false;
}
return true;
}
private static void localWriter(String log){
boolean needInit = needInitLocalWriter();
if(needInit ? initLocalWriter() : true){
localWriter.println(log);
}
}
private static void kafkaWrite(String log){
kafka.send(new ProducerRecord(INPUT_TOPIC,partition++%MAX_PARTITION,partition,log));
}
//对象级属性: 构造方法或setter初始化
private Session session = null;
//客户端首次连接服务器
@OnOpen
public void Open(Session session){
this.session = session;
//每个会话都有唯一的十六进制id
clients.put(session.getId(),this);
System.out.println(session.getId()+" connected");
}
//客户端断开连接
@OnClose
public void Close(){
clients.remove(session.getId());
System.out.println(session.getId()+" disconnected");
}
//连接异常
@OnError
public void onError(Throwable e){
//clients.remove(session.getId());
System.out.println(session.getId()+" disconnected");
e.printStackTrace();
}
//收到消息
@OnMessage
public void onMessage(String msg){
//kafka写入: 实时
kafkaWrite(msg);
//HDFS写入: 离线
localWriter(msg);
}
}
|