IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 基于Java开发Flink篇 -> 正文阅读

[大数据]基于Java开发Flink篇

package com.hj.flink;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;



public class FlinkStreamJavaExample {

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		// 读取文本路径信息,并用逗号隔开
		final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
		
		assert filePaths.length > 0;
		
		// WindowTime是设置窗口时间大小,默认情况下2分钟一个从窗口足够读取文本内容的所有信息
		final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
		
		// 构建运行环境,使用Environment处理窗口数据
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(1); // 并行度
		
		// 读取文本的数据流
		DataStream<String> unionStream = env.readTextFile(filePaths[0]); // 初始值
		if(filePaths.length > 1){
			for (int i = 1; i < filePaths.length; i++){
				unionStream = unionStream.union(env.readTextFile(filePaths[i]));
			}
		}
		
		// 数据转换,构造整个数据处理的逻辑,计算出结果并打印出来
		unionStream.map(new MapFunction<String, UserRecord>() {

			@Override
			public UserRecord map(String value) throws Exception {
				// TODO Auto-generated method stub
				return getRecord(value);
			}
		})
		.assignTimestampsAndWatermarks(new Record2TimeStampsExecuter()) // 判断新水印产生时间是否大于上一次水印产生时间,如果大于选择新水印,小于则选择上一次水印
		.filter(new FilterFunction<UserRecord>() {
			
			@Override
			public boolean filter(UserRecord value) throws Exception {
				// TODO Auto-generated method stub
				return value.gender.equals("female");
			}
		})
		.keyBy(new UserRecordSelector()) // 将<<name,gender>,shoppingTime>
		.window(TumblingEventTimeWindows.of(Time.milliseconds(windowTime)))
		.reduce(new ReduceFunction<UserRecord>() {
			
			@Override
			public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception {
				// TODO Auto-generated method stub
				value1.shoppingTime += value2.shoppingTime;
				return value1;
			}
		})
		.filter(new FilterFunction<UserRecord>() {

			@Override
			public boolean filter(UserRecord value) throws Exception {
				// TODO Auto-generated method stub
				return value.shoppingTime > 120;
			}
			
		})
		.print()
		;
		
		env.execute("FlinkStreamJavaExample");
		
	}
	
	// 构建KeyBy的关键字作为分组依据
	public static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>>{

		@Override
		public Tuple2<String, String> getKey(UserRecord value) throws Exception {
			// TODO Auto-generated method stub
			return Tuple2.of(value.name, value.gender);
		}
		
	}
	
	// 定义水印方法
	public static class Record2TimeStampsExecuter implements AssignerWithPunctuatedWatermarks<UserRecord>{

		@Override
		public long extractTimestamp(UserRecord arg0, long arg1) {
			// TODO Auto-generated method stub
			return System.currentTimeMillis();
		}

		@Override
		public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
			// TODO Auto-generated method stub
			return new Watermark(extractedTimestamp - 1); 
		}
		
	}
	
	// 解析文本数据,构造UserRecord数据结构
	public static UserRecord getRecord(String line){
		String[] elems = line.split(",");
		assert elems.length == 3;
		return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
	}
	
	// 定义UserRecord数据结构的定义,并重写toString函数
	public static class UserRecord{
		public String name;
		public String gender;
		public int shoppingTime;
		public UserRecord(){};
		public UserRecord(String n, String g, int s){
			this.name = n;
			this.gender = g;
			this.shoppingTime = s;
		}
		
		public String toString(){
			return "name: " + name + ", gender: " + gender + ", shoppingTime: " + shoppingTime; 
		}
	}

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:27:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:29:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码