Stream
Stream流是jdk1.8对集合对象功能的增强,可以通过将集合转换为流模型,通过声明的方式对集合中的每个元素进行一系列并行或者串行的流水线操作
举例:对数据进行排序取值
从数据源中获取一组用户信息,查找年满18周岁以上的所有用户,然后针对用户信息按照工资值进行倒序排序,获取所有用户的id编号
public class User{
private Long id;
private String name;
private Double salary;
private int age;
}
随机生成用户信息,并存储在List中
List<User> userList = new ArrayList<>();
Random r=new Random();
for(int i=1;i<11;i++){
User tmp=new User();
tmp.setId(i+0L);
tmp.setName("name-"+i);
tmp.setAge(r.nextInt(10)+18);
tmp.setSalary(r.nextDouble()*9000+1000);
userList.add(tmp);
}
System.out.println(userList);
常规编写方法
List<User> ulist=new ArrayList<>();
for(User tmp:userList)
if(tmp.getAge()>18) ulist.add(tmp);
Collections.sort(ulist,new Comparator<User>(){
public int compare(User u1, User u2){
return u2.getSalary().compareTo(u1.getSalary());
}
});
List<Long> idList=new ArrayList<>();
for(User tmp:ulist)
idList.add(tmp.getId());
System.out.println(idList);
使用Stream的方法
使用Stream可以简化代码,使代码简洁易读;如果使用parallelStream可以启动并发模式,使程序执行速度更快
userList.parallelStream().filter(t->t.getAge()>18).sorted(u1,u2)->u2.getSalary().compareTo(u1.getSalary()).map(User::getId).forEach(Sysytem.out::println);
概述Stream
Stream不是集合元素,不是数据结构并不保存数据,Stream是有关算法和计算的,象是一个高级版本的迭代器Iterator。Stream只要给出对其包含的元素执行什么操作,Stream就会隐式地在内部进行遍历,并给出响应的数据转换。单向不可往复、数据只能遍历一次,遍历过后就使用完毕
Stream可以并行化操作,Stream的并行操作是依赖Java7中引入的Fork/Join框架拆分任务和加速处理过程,并且允许数据源是无限的
基本的执行流程
使用Stream通常包括3个基本步骤:获取一个数据源source—转换为流—执行操作—获取所想要的结果。每次转换原有的Stream对象,执行结果还是Stream,可以使用串式写法定义多个操作
-
数据源就是原始的数据集合 -
将 List<T> 集合转换为 Stream<T> -
针对Stream进行一系列操作,每次操作结束返回还是Stream -
可以将Stream转换回集合类型,也可以直接对其中的元素进行处理
Integer ids = roomlist.stream().filter(b->b.getLength==10).sort((x,y)->x.getHigh()-y.getHigh()).mapToInt(Room::getWidth).sum();
- 创建Stream
- 转换Stream,每次执行转换Stream对象类型不改变,返回一个新的Stream对象
- 对Stream进行聚合操作,获取需要的结果
创建Stream
流Stream既可以是串行顺序的stream,也可以是并行的parallelStream。
顺序流的操作是在单线程上执行,而并行流是在多线程上并发执行的
方法一:Arrays.stream()/Collection.parallelStream
1、可以使用Arrays.stream将一组数据转换为Stream
Integer[] arr = new Integer[]{3,4,5,6,78,4};
long count = Arrays.stream(arr).filter(i->i>20).count();
2、使用Collection.parallelStream使用并行流,处理任务并行执行。前提是硬件支持
也可以使用Collection.stream使用串行流
List<Integer> list = Arrays.asList(1,2,3,4,5,6);
Stream<Integer> stream = list.parallelStream();
stream.foreach(System.out::println);
- parallelStream在使用上和stream没有区别,本质上返回的都是一个流,只不过底层处理时会根据执行环境的条件判断时并行还是串行
- 并行流并不会按照原本的顺序轨迹执行,而是随机执行
方法二:IntStream/LongStream/DoubleStream
对于基本类型目前有3中对应的包装类型Stream:IntStream、LongStream和DoubleStream。如果不使用对应的Stream类型,也可以使用 Stream<Integer> 、Stream<Long> 和Stream<Double> ,但是针对元素进行装拆箱操作比较耗时,所以才有了常用的基本数据类型的对应Stream
IntStream.of(3,5,18,1,4).foreach(System.out::println);
IntStream.range(1,10).foreach(System.out.println);
IntStream.rangeClosed(10,20).foreach(System.out::println);
方法三:从输入流中获取数据的方法
可以使用BufferedReader.lines()生成stream,把流中的内容一行一行的读取出来
BufferedReader br = new BufferedReader(new FileReader("data.txt"));
Stream<String> stream = br.lines;
stream.foreach(System.out::prinln);
方法四:创建无限流的方法 Stream接口中定义的静态方法generate和iterate()
generate方法可以接收一个参数函数用于创建无限stream
Stream<String> stream = Stream.generate(() -> "test");
String[] arr = stream..limit(10).toArray(String[]::new);
System.out.println(Arrays.toString(arr));
iterate方法可以接收一个参数函数用于创建无限的stream
Stream<BigInteger> stream1 = Stream.iterate(BigInteger.ZERO, n->n.add(BigInteger.TEN));
BigInteger[] arr = stream1.limit(10).toArray(BigInteger[]::new);
System.out.println(Arrays.toString(arr));
转换操作
Stream的操作符基本可以分为中间操作符和终止操作符两大类,中间操作符会继续向下传递,终止操作符直接对数据进行消费或者收集,不会继续向下传递
中间操作符
filter方法
filter方法用于对传入的数据流进行过滤处理,只返回满足条件的数据组成的新的数据流
List<Integer> list = new ArrayList<Integer>();
list.add(15);
list.add(32);
list.add(67);
list.add(232);
System.out.println(list);
List<Integer> result = list.stream().filter(i->i>50).collect(Collectors.toList());
System.out.println(result);
map方法
map方法用于对流中的数据进行某种形式的转换,转换操作函数当做参数传入方法
List<Integer> list = new ArrayList<Integer>();
list.add(15);
list.add(32);
list.add(67);
list.add(232);
System.out.println(list);
List<String> result = list.stream().map(i->String.valueOf(i)).collect(Collectors.toList());
System.out.println(result);
List<String> result2 = list.stream().map(i->String.valueOf(i)).filter(bb->bb.length()>2).collect(Collectors.toList());
System.out.println(result2);
flatMap方法
flatMap可以对每个元素应用一个函数,并将返回的值收集到一个新的流中
List<Integer> list1 = new ArrayList<>();
list1.add(34);
list1.add(25);
list1.add(35);
List<Integer> list2 = new ArrayList<>();
list2.add(999);
list2.add(888);
list2.add(999);
list2.add(666);
Map<String, List<Integer>> testMap = new HashMap<>();
testMap.put("aa", list1);
testMap.put("bb", list2);
List<Integer> result = testMap.values()
.stream().flatMap(num -> num.stream()).collect(Collectors.toList());
System.out.println(result);
limit方法
limit方法会返回一个包含指定个数元素的新stream,如果原始数据总长大小不足则返回原始流
skip方法
skip方法的含义是跳过多少个元素,继续处理后续元素
List<Integer> list = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
List<Integer> result = list.stream().limit(5).collect(Collectors.toList());
System.out.println(result);
List<Integer> result2 = list.stream().skip(5).collect(Collectors.toList());
System.out.println(result2);
distinct方法
distinct会根据原始流中的元素返回一个具有相同顺序,但是剔除了重复值的流
List<Integer> list = new ArrayList<Integer>();
list.add(10);
list.add(39);
list.add(10);
list.add(78);
list.stream().forEach(obj->System.out.print(obj+"\t"));
System.out.println("");
List<Integer> list2 = list.stream().distinct().collect(Collectors.toList());
System.out.println(list2);
sorted方法
sorted方法是遍历整个流的所有数据,并且在产生任何数据元素之前对它进行排序。注意Comparable接口和Comparator接口
List<Integer> list = new ArrayList<Integer>();
list.add(10);
list.add(39);
list.add(10);
list.add(78);
List<Integer> result =list.stream().sorted(Integer::compareTo).collect(Collectors.toList());
System.out.println(result);
Random r = new Random();
r.ints().limit(10).sorted().forEach(System.out::println);
聚合操作
将流中的数据进行汇聚为一个值,一般都是终止操作
终止操作
- collect收集操作,使用官方的Collectors提供的收集器
- count统计操作,统计数据个数
- findFirst/findAny查找操作,返回的类型为Optional
- noneMatch、AllMatch和anyMatch匹配操作,检查数据流中是否存在符合条件的数据
- min和max最值操作,需要比较器
- reduce规约操作,将数据流的值规约为一个值,例如count/min/max底层实际上就是使用reduce
- forEach遍历操作,可以对最终的数据进行消费
- toArray数组操作,用于将数据流的元素转换为数组
min和max用于获取最小值和最大值,注意Comparator接口
count用于统计元素个数
List<Integer> list = new ArrayList<>();
list.add(10);
list.add(39);
list.add(10);
list.add(78);
list.add(39);
Integer maxInteger = list.stream().max(Integer::compareTo).get();
System.out.println(maxInteger);
long len = list.stream().count();
System.out.println(len);
利用收集器Collectors实现的聚合操作
List<Integer> list = new ArrayList<>();
list.add(10);
list.add(39);
list.add(10);
list.add(78);
list.add(39);
long sum = list.stream().collect(Collectors.summarizingInt(Integer::intValue)).getSum();
System.out.println(sum);
Double avg = list.stream().collect(Collectors.averagingInt(Integer::intValue));
System.out.println(avg);
Integer maxInteger = list.stream().collect(Collectors.maxBy(Integer::compare)).get();
System.out.println(maxInteger);
Integer min = list.stream().collect(Collectors.minBy(Integer::compareTo)).get();
System.out.println(min);
Collectors类实现了很多的聚合操作,例如将流转换为集合或者聚合元素,Collector可以返回列表List或者字符串
List<String> list1 = Arrays.asList("abc","","bcd","","efg","abcd","jklm");
List<String> list2 = list1.stream().filter(str->!str.isEmpty()).collect(Collectors.toList());
String result = list1.stream().filter(str->!str.isEmpty()).collect(Collectors.joining(","));
System.out.println(list2);
统计
Random r = new Random();
List<Integer> list = r.ints().limit(10).mapToObj(Integer::valueOf).collect(Collectors.toList());
int[] arr = new int[list.size()];
for (int i = 0; i < list.size(); i++) {
arr[i] = list.get(i);
}
IntStream is = IntStream.of(arr);
is.forEach(t->System.out.print(t+"\t"));
System.out.println();
int sum = IntStream.of(arr).sum();
System.out.println(sum);
double avg = IntStream.of(arr).average().getAsDouble();
System.out.println(avg);
long count = IntStream.of(arr).count();
System.out.println(count);
int max = IntStream.of(arr).max().getAsInt();
System.out.println(max);
int min = IntStream.of(arr).min().getAsInt();
System.out.println(min);
使用统计结果收集器产生统计结果值,主要用于int、long、double等基本类型
List<Integer> nums = Arrays.asList(3, 2, 2, 3, 7, 3, 5);
IntSummaryStatistics stats = nums.stream().mapToInt((x) -> x).summaryStatistics();
System.out.println("最大值:"+stats.getMax());
System.out.println("最小值:"+stats.getMin());
System.out.println("所有数据之和:"+stats.getSum());
System.out.println("所有数据的平均值:"+stats.getAverage());
System.out.println("所有数据的个数:"+stats.getCount());
查找元素
findFirst返回非空集合中的第一个元素值,一般会与filter方法结合使用
List<Integer> list = new ArrayList<>();
list.add(10);
list.add(10);
Optional<Integer> op = list.stream().filter(i -> i > 20).findFirst();
if (op.isPresent())
System.out.println(op.get());
else
System.out.println("没有数据");
findAny可以在stream中找到任意一个所匹配的元素就直接返回,在流的平行处理十分有效
List<Integer> list = new ArrayList<>();
list.add(10);
list.add(39);
list.add(10);
list.add(78);
list.add(39);
Integer any = list.parallelStream().filter(i->i>20).findAny().get();
System.out.println(any);
anyMatch判定是否还有匹配的元素,返回一个boolean值
List<Integer> list = new ArrayList<>();
list.add(10);
list.add(39);
list.add(10);
list.add(78);
list.add(39);
boolean res = list.parallelStream().anyMatch(i -> i > 100);
System.out.println(res);
类似的方法allMatch和noneMatch分别是判断所有的元素都满足条件或者没有元素满足条件返回true。这些方法用来检查整个流,所以可以通过使用并行流提高速度
reduce归并
reduce方法用于将流中的元素进行进一步的对并计算
List<Integer> nums = Arrays.asList(13, 2, 2, 3, 7, 3, 5);
Integer sum = nums.stream().reduce((x,y)->x+y).get();
System.out.println(sum);
Integer sum1 = nums.stream().reduce(Integer::sum).get();
System.out.println(sum1);
Integer sum2 = nums.stream().reduce(10, Integer::sum);
System.out.println(sum2);
int sum3 = nums.stream().map(Object::toString).mapToInt(String::length).sum();
System.out.println(sum3);
遍历操作
Stream提供了forEach可以迭代流中的每个数据,forEach方法可以接收一个lambda表达式,并且在每个元素上执行该表达式,但是注意不能修改本地变量值,也不能break/return之类的关键字提前结束循环
Random r = new Random();
r.ints().limit(10).forEach(System.out::println);
System.out.println("---------------");
r.ints().limit(10).forEach((tt)->{
if (tt>10) {
System.out.println(tt);
}
});
收集结果
当数据经过souce–transform–sink处理后进行处理结果的收集。一般使用Collectors提供的常见的收集器
集合
Random r = new Random();
List<Integer> intList = r.ints().limit(10).mapToObj(Integer::valueOf).collect(Collectors.toList());
Set<Integer> set = r.ints().limit(10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
System.out.println(intList);
System.out.println(set);
字符串
String res = r.ints().limit(10).mapToObj((x)->""+x).collect(Collectors.joining(","));
System.out.println(res);
Map集合
可以使用Collectors.toMap方法将集合中的元素收集到Map中,但是要求有2个参数,分别用来生成Map中的key值和value值
List<Person> list = new ArrayList<>();
Random r = new Random();
for (int i = 0; i < 10; i++) {
Person p = new Person();
p.setId(i + 1L);
p.setName("name-" + i);
p.setAge(r.nextInt(8) + 14);
list.add(p);
}
Map<Long, String> map1 = list.stream().collect(Collectors.toMap(Person::getId, Person::getName));
System.out.println(map1);
Map<Long, Person> map2 = list.stream().collect(Collectors.toMap(Person::getId, Function.identity()));
System.out.println(map2);
分组操作
按照年龄分组操作
List<Person> list = new ArrayList<>();
Random r = new Random();
for (int i = 0; i < 10; i++) {
Person p = new Person();
p.setId(i + 1L);
p.setName("name-" + i);
p.setAge(r.nextInt(8) + 14);
list.add(p);
}
Map<Integer, List<Person>> map1 = list.stream().collect(Collectors.groupingBy(Person::getAge));
System.out.println(map1);
当分组条件是一个返回boolean值的函数时,流元素可以分为2组列表,一个是返回true的元素集合,一个是返回false的元素集合
List<Person> list = new ArrayList<>();
Random r = new Random();
for (int i = 0; i < 10; i++) {
Person p = new Person();
p.setId(i + 1L);
p.setName("name-" + i);
p.setAge(r.nextInt(8) + 14);
list.add(p);
}
Map<Boolean, List<Person>> map1 = list.stream().collect(Collectors.partitioningBy(p -> p.getAge() > 20));
System.out.println(map1);
并行流中的问题
stream使并行计算比较容易,在数据量较大的集合中可以体现出并行流的优势,利用多核CPU资源提升执行效率。但是使用需要注意一些问题。
首先必须是并行流。parallel方法可以将任意的串行流转换为并行流
其次要确保传递给并行流的操作函数是线程安全的
转换结构
转换为数组
Object[] arr = Stream.of(1, 2, 2, 5, 2, 7).toArray();
System.out.println(Arrays.toString(arr));
Integer[] arr2 = Stream.of(1, 2, 2, 5, 2, 7).toArray(Integer[]::new);
System.out.println(Arrays.toString(arr2));
转换为集合
List<Integer> list = Stream.of(1, 2, 2, 5, 2, 7).collect(Collectors.toList());
System.out.println(list);
ArrayList<Integer> list2 = Stream.of(1, 2, 3, 4,5).collect(Collectors.toCollection(ArrayList::new));
System.out.println(list2); Set<Integer> set1 = Stream.of(1, 2, 2, 4, 2, 5).collect(Collectors.toSet());
System.out.println(set1);
Stack<Integer> stack = Stream.of(1, 2, 2, 4, 2, 5).collect(Collectors.toCollection(Stack::new));
System.out.println(stack);
转换为String
String res = Stream.of("1", "2", "2", "5", "2", "7").collect(Collectors.joining());
System.out.println(res);
练习题
读取一个文本文件,查找最长一行内容的字符个数
BufferedReader br = new BufferedReader(new FileReader("input/abc.logs"));
int maxLength=br.lines().mapToInt(String::length).max().getAsInt();
br.close();
System.out.println(maxLength);
读取一个英文的文本文件,查找所有的英文单词,转换为全小写,并排序
BufferedReader br = new BufferedReader(new FileReader("input/bbb.logs"));
List<String> words = br.lines().flatMap( line -> Stream.of(line.split(" "))).filter(word -> word.length() > 1) .map(String::toLowerCase).distinct().sorted().collect(Collectors.toList());
br.close();
System.out.println(words);
总结
- stream不是数据结构,没有内存内部存储,只是用操作管道从source中抓取数据,不修改数据的底层结构
- 一般stream的操作都是使用lambda表达式为参数
- 不支持索引访问
- stream具有惰性化特征,很多的stream都是向后延迟,直到搞清楚有多少数据后进行计算
- 并行化操作
|