概述
JAVA8的函数式编程我们都比较熟悉,将一段对数据的操作逻辑,通过map、reduce、group、join、foreach、limit、count等算子进行表达最终得出结果,类似于Spark中提供的算子,虽然长的一样,但原理不同,而且Hadoop中也有对应的MapReduce写法,感觉也差不多的样子,现在又有了Reactor,也是一长串函数式编程的代码风格,那区别在哪里; 代码虽然看起来很像,有两点区别:
1、核心实现上不同,JAVA8的流式计算,通过NextItemEvalProcess的惰性求值来搞的;Spark就是基于数据+计算模型创建出对应的DAG,通过RDD执行;Hadoop的MR也是自己实现的分组聚合逻辑;Reactor是今天我们要了解的部分; 2、从实现的模式上看,一种是Pull-based,一种是Push-based,不进行赘述
聚焦到Reactor上,先尝试用一张图说一下自己对周边技术栈的关系,理清大体脉络:
几个观点:
- 响应式编程语法上集成JAVA8的流式编程,也只是语法上集成
- Reactive-programming是编程范式,不是具体的一个组件实现集合
- RXJava、Reactor是实现Reactive-programming的两种实现集合
- 全称SpringWebflux,对应的技术栈就是SpringBoot,HTTP容器不同,tomcat和netty
- Flux/Mono在webflux中使用
Reactor的简介
Reactor是一种JAVA非阻塞的响应式编程,同时具备背压的能力,天然集成JAVA8的函数式编程,提供了Flux/Mono这两种Reactive-programming的实现;Reactor也支持异步的Reactor-Netty项目,在HTTP/TCP/UDP等多种协议下均支持背压; 在JAVA8上的Reactor的依赖是:org.reactivestreams:reactive-streams:1.0.3
Reactor是一种Reactor-Programming的范式实现,关于Ractive-Programming的说明可以了解一下:
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
第一个实践者是微软搞的Rx在.NET中,后来RxJava实现了JAVA的支持,再后来基于对响应式范式的应用场景认可后,在JAVA9中也支持了,这就是我们平还在搜一些先关资料的时候,一会儿JAVA8的函数式,一会儿JAVA9才支持Flow,有点乱,其实就是在JAVA9将Reactive-Programming的范式合入了Flow里。Reactive-Programming一般比较常见在观察者模式中使用,可以对比一下Reactive-Programming和JAVA8的函数式编程,核心区别就是pull-based/push-based;众所周知Iterator我们用的非常多了,尽管对数据的访问仅仅是通过Iterable,实际上我们对数据的访问是通过next()方法,通过不断的next()调用,来不断驱动逻辑对数据的处理;而在Reactive-Programming中,对应Iterator/iteratable的一组概念编程了Publisher-Subscriber,不同的是,是由Publisher通知Subcriber最新的数据到来,这个通知可以理解为push,相比原有的JAVA8的函数式编程,非常重要的区别就在这里,这里是Push-based而不是Pull-based;不仅仅是数据获取的流程通过push,我们对数据的计算逻辑也从命令式转变为了生命式,这里如果不好理解,可以比对Spark的DAG生成逻辑,通过一组RDD的依赖和计算逻辑的描述,将这段描述按照依赖关系发送到各个Executor执行;此时我们在Reactive-Programming的计算逻辑,也被封装成了一个“DAG”,类似一个Transform算子,没有Action触发的情况下,不会触发执行;我们写代码的时候,关注的仅仅是计算逻辑而不是控制逻辑
计算逻辑:输入是什么,输出是什么、怎么算 控制逻辑:什么时候输出、什么时候输入、什么时候计算
除了通过push的方式通知数据到来,Reactor也支持异常/错误逻辑的处理,当我们希望一个Publisher通知一个Subscriber数据是,通过onNext()方法,当我们想通知Subscriber有异常时,通过onError方法,但我们想通知Subscriber数据结束时,通过onComplete方法,不管是Error还是completion都会让我们一个流结束;可以感受到的是,这种push的方法就很灵活,因为这种就会对没有数据、一条数据、多条数据、无限多的数据,均有一种通用的支持方式,那么这种push的流式编程方法的具体场景有哪些。
使用场景
性能提升,软件层面一般来说两种方法,并行化或者是更高效的算法,一般来说我们大部分场景都是在一个线程内阻塞式的执行我们的代码,一旦出现任何性能瓶颈,就会导致整个流程卡死,如果用多线程的方法,需要考虑线程安全问题,也比较麻烦,而且更无解的是,这种方式也解决不了计算机系统单一资源瓶颈的问题,比如一段时间内的所有逻辑都集中等待IO,此时线程waiting,CPU就是一种浪费,这种不是并行能够解决的问题。
既然并行解决不了,那我们就通过异步的方式,当IO资源耗尽,我们让线程从waiting状态编程idle状态,一般我们会通过callable和future的方式来异步获得结果,可以选择通过callable的回调或者future手动异步的方式实现释放资源,不过这种方法会带来大量的匿名函数以及嵌套深度,比较费劲。Reactor给了个例子非常好:
这个例子说明了,在我们追求性能(资源利用率)的情况下,肯定是得异步的,那异步的常规套路,就是上面这套代码,问题就是:
- 太长了,大量的方法定义,没啥意义
- 嵌套太深,分支太多了,看起来费劲
题外话:如果我们不追求性能,那么完全可以按照典型的面向过程写这段代码,而且可读性会很好,也很清晰,出了问题也好定位,但是为了追求性能,就得用一些技术手段,这时代码逻辑就和使用的性能提升手段耦合了,这个就很讨厌,业务还是以前的业务,代码面目全非了。
Reactor针对这种追求性能还想把代码写好的场景,给出了一个解决方案: 甚至还基于增量的业务开发给了一个案例(查询Favorites的时候最长800ms的超时):
那OK,实际上我们研究Reactor就是看它是如何通过这简单几行代码替代上面那一大堆代码的。虽然JAVA8的CompletableFuture也实现了一些基于函数式的计算编排入口,比如thenApply和thenAccept让我们传一个Functional Interface进去,但整体来看还是不够易用,而且还存在几个问题没解决:
1、虽然支持了异步,但是每一层计算逻辑的异步都要我自己实现,任意一层的逻辑如果出现阻塞,那么仍然存在资源浪费的问题,比如任意内部Future对象调用了get() 2、并不支持惰性求值 3、对多值计算以及异常处理逻辑不够友好
为了说明这一点,官网有给出了对应的例子,可以参考: 例子的大体意思就是要查出来一堆id,然后根据id查询name和对应的出现次数进行统计汇总输出,通过Reactor的方式能够让逻辑变的更加清晰。
题外话:OK,我们看到这里可能会觉得,都是一堆函数式编码,感觉并不是很友好的样子,没错,是的,因为想异步,因为想高效,所以看起来就不太好,很难看,代码不表达业务逻辑,这个是很蛋疼的问题,但是以为了异步,目前还没有好办法来解决这个问题,类似于Spark的算子,既然在MR的基础上通过DAG+RDD,为什么不直接扔掉MR呢,结果搞了一堆算子还是不那么易用,但是好就好在Spark是计算场景下的工具,就是通过算子表达计算逻辑,因此学起来就还好,本身代码的表达和业务也比较接近;但是通过计算的编码方式去表达非计算的业务场景,看起来就比较难看了,这个我估摸着应该是有解决办法的,毕竟Spark纯计算场景下仍然衍生出了SparkSQL,要相信人总是会偷懒的。
对比JAVA8原生的异步方式,Reactor做到了:
1、可读性好一些 2、数据的算子相对丰富一些 3、绝壁的惰性运算,subscribe调用之前,啥也不会干的 4、支持背压,背压能解决很多大流量的问题
题外话:但是个人认为最大的缺点是无法直观的看到异步逻辑,这种异步的透明让人感觉很没底,用这个的成本感觉跟用Spark差不多,说是屏蔽了异步的实现,其实还得搞明白,不然代码出了问题怎么办呢?就跟Spark一样,不可能真的只去了解算子怎么用,而不关心RDD怎么依赖和调度。
从以上例子中可以看出,大体的思路和JAVA8 Stream的惰性求值差不多,都可以认为构建在Reactor上的数据计算,类似在一个工厂加工车间的流水线传送带上,传送带的控制和传送带上的不同机床就是Reactor,Reactor控制工厂内多个传送带以及每哥传送带上的机床进行协同工作,当某个机床出现异常或者阻塞式,Reactor还可以及时的限流,也就是背压,在Reactor中的算子类似于一个机床,每个机床都会将自己的加工逻辑添加到一个Publisher上,我们认为一个数据的处理逻辑开始一定是一个Publisher,而不同的Publisher的串接一定会将前一个Publisher的计算逻辑进行wrap叠加,最终在Subscriber完成整个处理逻辑,当我们调用subscribe的时候,就将一个从Publisher到Subscriber的链路进行执行,内部实现是通过一个名为request的信号来实现的,让上游的Publisher停止onNext()的调用。对于ubscriber来说,数据分为冷热两种,如果是冷数据,那么每个订阅相同Publisher的Subscriber都会受到相同的数据(多副本,new 出来的);如果是热数据,那么只有在调用subscribe之后的数据才能收到,而且如果是热数据,及时没有Subscriber,Publisher也能发出数据(触发算子逻辑执行),这个和计算逻辑一定在subscribe之后有一点冲突。
Reactor的使用
我们对一个Reactive-programming的框架使用诉求,有两类诉求: 1、框架实现了Reactive-Programming范式的核心逻辑,支持publish和subscribe 2、框架最好提供一些常用的算子
关于Reactive-programming,Reactor通过JAVA进行了实现:Flux和Mono,Flux代表了响应式序列的0…N,Mono代表了0…1序列,这个多少有点区别,主要是在并发预期上的控制不同,比如我们如果是一个对象的处理到一个对象的返回,这种一对一的情况下,就没必要用Flux,类似一个HttpRequest和一个HttpResponse,用Mono就很好,需要注意的是算子本身对并发预期控制也有影响,比如我们在用一个Flux做了count之后,返回的是第一个Mono。
官网的这两种图大部分文章都在引用 Flux Mono 换种形式可能更好理解,Flux可能存在的三种情况: 多次onNext的调用均为异常,正常结束流 多次onNext调用但出现了异常,异常结束流 一次调用onNext后,计算出现异常,异常结束流 那么同理,Mono和Flux的这两个Publisher的区别是只会调用一次onNext,而且调用onNext之后理论上会直接调用onComplete,Mono.never()是个例外,通过never可以不发送任何信号,虽然这个应用场景不多,但是一旦使用,则无法onNext、onError都会被显示的禁止掉。需要注意的是,Mono返回的序列有时候就是个Flux,有时候要转一下才行,比如Mono.concatWith()返回的是Flux但是Mono.then()返回的是一个Mono,而且Mono还有个好处就是可以做纯异步的计算逻辑,就是无返回的,我们可以用一个Mono来创建一个无返回值的异步处理逻辑。
|