| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink1.14.3 Table API读取HDFS -> 正文阅读 |
|
[大数据]Flink1.14.3 Table API读取HDFS |
1 需求需求:Flink Table API从HDFS分布式文件系统读取JSON?件。 2 添加Maven依赖Flink使?JSON和HDFS引?如下依赖: #引入json格式依赖 <dependency> ?<groupId>org.apache.flink</groupId> ?<artifactId>flink-json</artifactId> ?<version>${flink.version}</version> </dependency> #引入日志依赖,进行代码调试 <dependency> ??<groupId>org.slf4j</groupId> ??<artifactId>slf4j-simple</artifactId> ??<version>1.7.15</version> </dependency> #引入flink与hadoop兼容性依赖 <dependency> ??<groupId>org.apache.flink</groupId> ??<artifactId>flink-hadoop-compatibility_2.11</artifactId> ??<version>${flink.version}</version> </dependency> #引入Hadoop依赖 <dependency> ??<groupId>org.apache.hadoop</groupId> ??<artifactId>hadoop-client</artifactId> ??<version>${hadoop.version}</version> #排查冲突jar包 ??<exclusions> <exclusion> ??<groupId>org.apache.commons</groupId> ??<artifactId>commons-math3</artifactId> </exclusion> <exclusion> ??<groupId>org.apache.commons</groupId> ??<artifactId>commons-compress</artifactId> </exclusion> ??</exclusions> </dependency> 3 代码实现package com.bigdata.chap02; import org.apache.flink.table.api.*; import static org.apache.flink.table.api.Expressions.$; /** *@author-yangjun *@contact-wei xin:john_1125 */ public class FlinkTableAPIFromHDFS { public static void main(String[] args) { //1、创建TableEnvironment EnvironmentSettings settings = EnvironmentSettings .newInstance() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //2、创建source table final Schema schema = Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) .column("cTime", DataTypes.STRING()) .build(); tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("filesystem") .schema(schema) .format("json") .option("path","hdfs://mycluster/data/clicklog/input/click.log") .build()); //3、创建sink table tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print") .schema(schema) .build()); //5、输出(包括执行,不需要单独在调用tEnv.execute("job")) tEnv.from("sourceTable") .select($("user"), $("url"),$("cTime")) .executeInsert("sinkTable"); } } 4 配置Hadoop环境变量在Windows本地配置Hadoop环境变量,同时Windows下的Hadoop安装目录中需要包含HDFS集群配置文件hdfs-site.xml和core-site.xml。否则代码无法识别路径hdfs://mycluster。 5 添加log4j文件在项目的src/main/目录下,创建resources目录并标记为Mark Directory as—>Resources Root,然后将log4j.properties文件拷贝到resources目录中,log4j.properties具体内容如下所示。 log4j.rootLogger=error,CONSOLE,info log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender ???? log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout ???? log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ??? log4j.logger.info=info log4j.appender.info=org.apache.log4j.DailyRollingFileAppender log4j.appender.info.layout=org.apache.log4j.PatternLayout ???? log4j.appender.info.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ? log4j.appender.info.datePattern='.'yyyy-MM-dd log4j.appender.info.Threshold = info ?? log4j.appender.info.append=true log4j.appender.info.File=G:/tmp/log/info.log log4j.logger.error=error ? log4j.appender.error=org.apache.log4j.DailyRollingFileAppender log4j.appender.error.layout=org.apache.log4j.PatternLayout ???? log4j.appender.error.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ? log4j.appender.error.datePattern='.'yyyy-MM-dd log4j.appender.error.Threshold = error ?? log4j.appender.error.append=true log4j.appender.error.File=G:/tmp/log/error.log log4j.logger.DEBUG=DEBUG log4j.appender.DEBUG=org.apache.log4j.DailyRollingFileAppender log4j.appender.DEBUG.layout=org.apache.log4j.PatternLayout ???? log4j.appender.DEBUG.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ? log4j.appender.DEBUG.datePattern='.'yyyy-MM-dd log4j.appender.DEBUG.Threshold = DEBUG ?? log4j.appender.DEBUG.append=true log4j.appender.DEBUG.File=G:/tmp/log/dubug.log 6 准备数据集准备测试数据集click.log: {"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"} {"user":"Mary","url":"./prod?id=1","cTime":"2022-02-02 12:00:05"} {"user":"Liz","url":"./home","cTime":"2022-02-02 12:01:00"} {"user":"Bob","cTime":"2022-02-02 12:01:30"} {"user":"Mary","url":"./prod?id=7","cTime":"2022-02-02 12:01:45"} 创建hdfs目录 bin/hdfs dfs -mkdir -p /data/clicklog/input 将click.log文件上传至/data/clicklog/input目录 bin/hdfs dfs -put click.log /data/clicklog/input 7 测试运行在idea工具中,右键项目选择Run运行Flink Table,如果能在控制台看到打印如下结果,说明Flink Table API能成功读取HDFS中的JSON格式数据。 4> +I[Mary, ./home, 2022-02-02 12:00:00] 4> +I[Mary, ./prod?id=1, 2022-02-02 12:00:05] 4> +I[Liz, ./home, 2022-02-02 12:01:00] 4> +I[Bob, null, 2022-02-02 12:01:30] 4> +I[Mary, ./prod?id=7, 2022-02-02 12:01:45] |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 12:54:07- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |