IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 大数据——WebSocket埋点实现离线+实时数据处理 -> 正文阅读

[网络协议]大数据——WebSocket埋点实现离线+实时数据处理

一、WebSocket概述

????????WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

? ? ? ? WebSocket使得客户端和服务器之间的数据缓缓变得更加简单,允许服务端主动向客户端推送数据。

? ? ? ? 在WebSocketAPI中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

? ? ? ? 浏览器通过JavaScript向服务器发出建立WebSocket连接的请求,连接建立以后,客户端和服务端就可以改通过TCP连接直接交换数据。

? ? ? ? 相对于HTTP这种非持久性的协议来说,WebSocket是一个持久化的协议。

二、WebSocket 工程的创建

选择Spring Initializr工程,点击下一步

?填写工程的组名,并选择所使用的java版本号

?选择Spring Web 和WebSocket工具

?

?

?填写工程名及所在路径

?

?这时,pom.xml文件中自动打入了如下两个依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

三、WebSocket配置

WebSocket配置可以使用@Configuration注解来代替xml文件配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

//@Configuration注解等同于xml文件
@Configuration
public class WebSocketConfig {
    //这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
    //endpoint 端口的意思
    @Bean
    public ServerEndpointExporter getServerEndPointExporter(){

        return new ServerEndpointExporter();
    }
}

四、核心代码

import cn.bigdata.shop12.websoc.websocserver.config.WebSocketConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.*;

//@ConditionalOnClass表示当有条件中的类时,自动创建组件,配置类里面内容才会生效
@ConditionalOnClass(value= WebSocketConfig.class)
//@Component 通用,当不明确属于哪一层时使用
//@Controller 控制层注解
//@Service 服务层注解
//@Repository 数据访问层注解
@Component
//当满足条件时,创建WebSocket端点
@ServerEndpoint(value="/shop12_ws/user_behavior_event")
public class ShopWebSocket {
    //类型属性: 所有ShopWebSocket对象共享
    private static ConcurrentMap<String,ShopWebSocket> clients = null;
    //类型属性: 负责向kafka中按条写入日志
    private static KafkaProducer<Integer,String> kafka = null;
    //输入kafka主题
    private static final String INPUT_TOPIC = "input_user_behavior_event_topic";
    //控制kafka消息写入的分区号
    private static int partition = 0;
    //kafka最大分区数
    private static final int MAX_PARTITION = 3;
    //失败次数
    private static final int MAX_RETRY_TIMES = 3;

    //HDFS
    private static SimpleDateFormat dateFormat = null;
    private static final String LOCAL_DIR = "C:\\Users\\Administrator\\Desktop\\local_dir";
    private static final int LOCAL_BUFFER = 1024;   //1024*1024
    private static final int FLUSH_BUFFER = LOCAL_BUFFER;
    private static PrintWriter localWriter = null;
    private static FileSystem hdfsWriter = null;

    //定时线程调度执行hdfs写入
    private static ScheduledExecutorService scheduler = null;
    private static File localFile = null;
    private static final String HDFS_ROOT = "hdfs://192.168.131.200:9820";
    private static final String HDFS_DIR = "/shop12/user_act_log";

    //静态代码块初始化: 自动调用
    static{
        clients = new ConcurrentHashMap<>();
        Properties kafkaConfig = new Properties();

        kafkaConfig.setProperty("bootstrap.servers","192.168.131.200:9092");
        kafkaConfig.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
        kafkaConfig.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaConfig.setProperty("retries","3");
        kafkaConfig.setProperty("acks","-1");
        kafkaConfig.setProperty("batch.size","8192");
        kafkaConfig.setProperty("linger.ms","30000");
        kafka = new KafkaProducer<Integer, String>(kafkaConfig);

        dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");

        scheduler = Executors.newScheduledThreadPool(2);
        Configuration hdfsConfig = new Configuration();
        hdfsConfig.set("fs.defaultFS",HDFS_ROOT);
        initHadoopFileSystem(hdfsConfig,0);

        scheduler.scheduleAtFixedRate(()->{
            writeToHdfs();
        },1,1, TimeUnit.MINUTES);
    }

    private static void writeToHdfs(){
        //将local_DIR目录下非空,不以.COMPLETED结尾的文件过滤出来
        File[] files = new File(LOCAL_DIR).listFiles(
                file->file.isFile() &&
                        !file.getName().endsWith(".COMPLETED") &&
                        file.length()>FLUSH_BUFFER);
        if (files.length>0){
            for (File file : files) {
               String[] ns = file.getName().split("_");
               String dir = ns[0];
               String name = ns[1];
                try {
                    hdfsWriter.copyFromLocalFile(
                            new Path(file.getAbsolutePath()),
                            new Path(HDFS_DIR+"/"+dir+"/"+name));
                    file.renameTo(new File(file.getAbsolutePath()+".COMPLETED"));
                    System.out.println("succeed to copy "+file.getName()+" to hdfs");
                } catch (IOException e) {
                    System.err.println("fail to copy "+file.getName()+" to hdfs : "+e.getMessage());
                }

            }
        }

    }
    private static void initHadoopFileSystem(Configuration hdfsConfig,int retryCount){
        try {
            hdfsWriter = FileSystem.get(hdfsConfig);
            return;
        } catch (IOException e){
            if(++retryCount<MAX_RETRY_TIMES){
                //递归
                initHadoopFileSystem(hdfsConfig,retryCount);
            }else{
                //没有启动hadoop服务,安全模式
                System.err.println("fail to initialize hadoop file system for 3 times");
                System.exit(-1);
            }
        }
    }

    private static boolean needInitLocalWriter(){
        if (null==localWriter){
            return true;
        }
        localWriter.flush();
        if (localFile.length()>=FLUSH_BUFFER){
            return true;
        }
        return false;
    }


    //static synchronized 的锁对象为shopWebSocket.class
    private static synchronized boolean initLocalWriter(){
        if (needInitLocalWriter()){
            if(null!=localWriter){
                localWriter.close();
            }
            for (int i = 0;i<MAX_RETRY_TIMES;){
                try{
                    localFile = new File(LOCAL_DIR+"/"+dateFormat.format(new Date()));
                    if (localFile.exists() && localFile.length()>=FLUSH_BUFFER){
                        Thread.sleep(1000);
                        continue;
                    }
                    localWriter = new PrintWriter(
                            new BufferedWriter(
                                    new FileWriter(localFile,true),LOCAL_BUFFER));
                    return true;
                }catch (Exception e){
                    i++;
                    continue;
                }
            }
            System.err.println("fail to initialize local file writer for 3 times");
            return false;
        }
        return true;
    }

    private static void localWriter(String log){
        boolean needInit = needInitLocalWriter();
        if(needInit ? initLocalWriter() : true){
            localWriter.println(log);
        }
    }

    private static void kafkaWrite(String log){
        kafka.send(new ProducerRecord(INPUT_TOPIC,partition++%MAX_PARTITION,partition,log));
    }

    //对象级属性: 构造方法或setter初始化
    private Session session = null;

    //客户端首次连接服务器
    @OnOpen
    public void Open(Session session){
        this.session = session;
        //每个会话都有唯一的十六进制id
        clients.put(session.getId(),this);
        System.out.println(session.getId()+" connected");
    }

    //客户端断开连接
    @OnClose
    public void Close(){
        clients.remove(session.getId());
        System.out.println(session.getId()+" disconnected");
    }

    //连接异常
    @OnError
    public void onError(Throwable e){
        //clients.remove(session.getId());
        System.out.println(session.getId()+" disconnected");
        e.printStackTrace();
    }

    //收到消息
    @OnMessage
    public void onMessage(String msg){
        //kafka写入: 实时
        kafkaWrite(msg);
        //HDFS写入: 离线
        localWriter(msg);

    }
}

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-24 15:54:17  更:2021-08-24 15:54:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 20:25:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计