IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> java nio 实现一个高性能im服务器 基于reactor模型 -> 正文阅读

[系统运维]java nio 实现一个高性能im服务器 基于reactor模型

package com.HyChat.server;

import com.HyChat.server.Handle.MegHandel;
import com.HyChat.server.Handle.MegHandelimpl;
import com.HyChat.server.untity.LoggerUntity;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 启动类 基于rector模型
 */
public class HyChatServer {

    private Selector selector;

    /**
     * cpu 核心的两倍
     */
    private final int MaxFollwer=Runtime.getRuntime().availableProcessors()*2;;

    private ServerSocketChannel socketChannel;

    private MegHandel megHandel;

    private int keys=0;

    private FollowerServer[] followerServer=new FollowerServer[MaxFollwer];

    private ExecutorService factory= Executors.newFixedThreadPool(3);
    /**
     * 启动server
     */
    public void Start() throws IOException {
        init();
        HandelConn();
    }

    /**
     * 初始化
     * @throws IOException
     */
    private void init() throws IOException {
       socketChannel= ServerSocketChannel.open();
       selector=Selector.open();
       //设置为非阻塞
       socketChannel.configureBlocking(false);
       socketChannel.socket().bind(new InetSocketAddress("127.0.0.1",8888));

       socketChannel.register(selector, SelectionKey.OP_ACCEPT);//注册链接事件

        megHandel=new MegHandelimpl();
        //初始化子rector
        for (int i=0;i<MaxFollwer;i++){
            followerServer[i]=new FollowerServer();
        }
    }

    /**
     * 处理链接请求
     */
    private void HandelConn() throws IOException {
        System.out.println("链接启动");
        int index=0;
        while (true) {
            keys = selector.select();//会阻塞直到有链接请求进来
            if (keys>0){
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                    SelectionKey currKey=iterator.next();

                    iterator.remove();

                    if (currKey.isAcceptable()){

                        socketChannel= (ServerSocketChannel) currKey.channel();

                        SocketChannel channel = socketChannel.accept();

                        channel.configureBlocking(false);

                        //注册读事件

                        followerServer[0].Regist(channel);


                        LoggerUntity.LogInfo("收到链接");
                    }
                }
            }
        }

    }
    public static void main(String[] args) {
        try {
            new HyChatServer().Start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

package com.HyChat.server;

import com.HyChat.server.Handle.MegHandel;
import com.HyChat.server.Handle.MegHandelimpl;
import com.HyChat.server.Message.ReqMessage;
import com.HyChat.server.ThreadPool.TaskPool;
import com.HyChat.server.untity.LoggerUntity;
import lombok.SneakyThrows;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.*;

/**
 * reactor模型 的子reactor
 * 监听读写事件
 */
public class FollowerServer {

    private Selector selector;

    /**
     * 消息转发器
     */
    private MegHandel megHandel;

    private ExecutorService factory;

    public  FollowerServer() throws IOException {
        this.selector = Selector.open();
        factory= Executors.newCachedThreadPool();
        megHandel=new MegHandelimpl();
        new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                ReadPoll();
            }
        }).start();
    }
    public  void Regist(SocketChannel channel) throws ClosedChannelException {
        try {
            LoggerUntity.LogInfo(String.format("收到新的链接%s", channel.getRemoteAddress().toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        channel.register(selector, SelectionKey.OP_READ);
    }
    private void ReadPoll() throws IOException {
        while (true){
            int selectLen=0;
            //设置一个超时时间不然无法注册事件
            selectLen=selector.select(1000);

            if (selectLen>0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey currKey = iterator.next();
                    iterator.remove();
                    if (currKey.isReadable()){

                        SocketChannel channel = (SocketChannel) currKey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int length = channel.read(buffer);
                        if (length==-1){
                            //下线了
                            LoggerUntity.LogInfo("用户下线了");
                            MegHandelimpl.Offline(currKey);
                            currKey.cancel();
                            //selector.keys().remove(currKey);
                            continue;
                        }
                        try {
                            TaskPool.Sumit(new Runnable() {
                                @SneakyThrows
                                @Override
                                public void run() {
                                    ReqMessage.MegBody megBody = ReqMessage.MegBody.parseFrom(Arrays.copyOfRange(buffer.array(),0,length));
                                    LoggerUntity.LogInfo(String.format("发送给 %s 的消息",megBody.getTarget()));
                                    try {
                                        megHandel.DoHandel(megBody,currKey);
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            });

                        }
                        catch (Exception e){
                            LoggerUntity.LogWaring("消息类型错误");

                        }
                    }

                }
            }
        }
    }

}

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 13:09:20  更:2022-05-09 13:11:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 18:12:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码