IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> reactive stream与spring-webflux初体验 -> 正文阅读

[Java知识库]reactive stream与spring-webflux初体验

目录

写在前面

一、reactive stream(jdk9)

?发布-订阅

二、异步servlet

同步servlet

异步servlet?

三、reactor = jdk8 stream + jdk9 reactive stream

四、webflux初体验

五、SSE(server?sent?events)


写在前面

Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。反应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版的后期添加的。它是完全非阻塞的,支持反应式流(Reactive Stream)背压,并在Netty,Undertow和Servlet 3.1 +容器等服务器上运行。

Spring WebFlux 是一个异步非阻塞式 IO 模型,通过少量的容器线程就可以支撑大量的并发访问。底层使用的是 Netty 容器,这点也和传统的 SpringMVC 不一样,SpringMVC 是基于 Servlet 的。

在此,咱们也简单体验一下spring-webflux到底是个什么东西。

学习webflux之前一定要将jdk8的stream学习明白了哟~

函数式编程-lambda函数与jdk8自带的函数接口_秃了也弱了。的博客-CSDN博客

jdk8-stream深入详解与运行机制_秃了也弱了。的博客-CSDN博客

一、reactive stream(jdk9)

1.Publisher<T>接口发布者
subscribe(Subscriber<? super T>):void:保证发布者和订阅者之间通过此方法建立订阅关系


2.Subscriber<T>订阅者
onSubscribe(Subscription):void:第一此签署订阅关系,输入就是Subscription对象
onNext(T):void:接收到一条数据
onError(Throwable):void:数据出错
onComplete():void:数据处理完了


3.Subscription发布者与订阅者之间的关系
request(long):void:告诉发布者需要多少资源
cancel():void


4.Processor<T,R>接口继承了Publisher和Subscriber,表示既可以做消费者,又可以做发布者,承担中间角色
defaultBufferSize():int
?

?发布-订阅

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class FlowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定义订阅者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();


                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 3. 发布者和订阅者 建立订阅关系
        publiser.subscribe(subscriber);

        // 4. 生产数据, 并发布
        // 这里忽略数据生产过程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成数据:" + i);
            // submit是个block方法
            publiser.submit(i);
        }

        // 5. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
        // debug的时候, 下面这行需要有断点
        // 否则主线程结束无法debug
        System.out.println();
    }

}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
* 带 process 的 flow demo
*/

/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;

        // 请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);

        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }

        // 处理完调用request再请求一个数据
        this.subscription.request(1);

        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();


        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }
}

public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();

        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);

        // 4. 定义最终订阅者, 消费 String 类型数据
        Subscriber<String> subscriber = new Subscriber<String>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();


                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);

        // 6. 生产数据, 并发布
        // 这里忽略数据生产过程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }

}

二、异步servlet

普通的springmvc就是同步servlet,请求处理线程是阻塞的。

webflux就像是异步servlet,请求接收之后,业务处理交给后台的业务处理线程来执行。

同步servlet

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Servlet implementation class SyncServlet
*/
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public SyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 执行业务代码
        doSomeThing(request, response);

        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(HttpServletRequest request,
            HttpServletResponse response) throws IOException {

        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        response.getWriter().append("done");
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}

异步servlet?

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Servlet implementation class AsyncServlet
*/
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public AsyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 开启异步
        AsyncContext asyncContext = request.startAsync();

        // 执行业务代码,为了方便使用jdk8的future
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(AsyncContext asyncContext,
            ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 业务代码处理完毕, 通知结束
        asyncContext.complete();
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}

三、reactor = jdk8 stream + jdk9 reactive stream

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

public class ReactorDemo {

   public static void main(String[] args) {
      // reactor = jdk8 stream + jdk9 reactive stream
      // Mono 0-1个元素
      // Flux 0-N个元素
      String[] strs = { "1", "2", "3" };

      // 2. 定义订阅者
      Subscriber<Integer> subscriber = new Subscriber<Integer>() {

         private Subscription subscription;

         @Override
         public void onSubscribe(Subscription subscription) {
            // 保存订阅关系, 需要用它来给发布者响应
            this.subscription = subscription;

            // 请求一个数据
            this.subscription.request(1);
         }

         @Override
         public void onNext(Integer item) {
            // 接受到一个数据, 处理
            System.out.println("接受到数据: " + item);

            try {
               TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
            
            // 处理完调用request再请求一个数据
            this.subscription.request(1);

            // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
            // this.subscription.cancel();
         }

         @Override
         public void onError(Throwable throwable) {
            // 出现了异常(例如处理数据的时候产生了异常)
            throwable.printStackTrace();


            // 我们可以告诉发布者, 后面不接受数据了
            this.subscription.cancel();
         }

         @Override
         public void onComplete() {
            // 全部数据处理完了(发布者关闭了)
            System.out.println("处理完了!");
         }

      };
      
      // 这里就是jdk8的stream
      Flux.fromArray(strs).map(s -> Integer.parseInt(s))
      // 最终操作
      // 这里就是jdk9的reactive stream
      .subscribe(subscriber);

   }
}

四、webflux初体验

?官方的比对,说是webflux是非阻塞的、支持reactive?stream、并且不支持关系型数据库只支持非关系型数据库。

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>

我们可以看到,get1就是普通的mvc,get2就是webflux模式,对于用户来说这两种模式体验是一样的。但是对服务器来说,get1是阻塞的,get2是非阻塞的(其实无非是将业务处理放到后台线程,我感觉最终消耗的系统资源其实并没有少,也没有网上吹嘘的那么好,只是并发量高了之后可以接收更多的用户请求,但是处理请求不见得会快多少)。

get3的形式就是会逐个输出每一条数据。

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


@RestController
@Slf4j
public class TestController {


   @GetMapping("/1")
   private String get1() {
      log.info("get1 start");
      String result = createStr();
      log.info("get1 end.");
      return result;
   }


   @GetMapping("/2")
   private Mono<String> get2() {
      log.info("get2 start");
      Mono<String> result = Mono.fromSupplier(() -> createStr());
      log.info("get2 end.");
      return result;
   }


   /**
    * Flux : 返回0-n个元素
    *
    * @return
    */
   @GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
   private Flux<String> flux() {
      Flux<String> result = Flux
            .fromStream(IntStream.range(1, 5).mapToObj(i -> {
               try {
                  TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
               }
               return "flux data--" + i;
            }));
      return result;
   }


   private String createStr() {
      try {
         TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
      return "some string";
   }


}

五、SSE(server?sent?events)

webflux的Flux,就像是SSE一样,可以实现类似向服务器推送资源的功能。

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Servlet implementation class SSE
*/
@WebServlet("/SSE")
public class SSE extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public SSE() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        for (int i = 0; i < 5; i++) {
            // 指定事件标识
            response.getWriter().write("event:me\n");
            // 格式: data: + 数据 + 2个回车
            response.getWriter().write("data:" + i + "\n\n");
            response.getWriter().flush();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        }

    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
    <script type="text/javascript">
        // 初始化, 参数为url
        // 依赖H5
        var sse = new EventSource("SSE");

        sse.onmessage = function(e) {
            console.log("message", e.data, e);
        }

        // 监听指定事件, (就不会进入onmessage了)
        sse.addEventListener("me", function(e) {
            console.log("me event", e.data);
            // 如果不关闭,会自动重连
            if (e.data == 3) {
                sse.close();
            }
        });
    </script>
</body>
</html>

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:26:14  更:2022-05-09 12:26:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 22:54:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码