IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink1.14.3 Table API读取HDFS -> 正文阅读

[大数据]Flink1.14.3 Table API读取HDFS

1 需求

需求:Flink Table API从HDFS分布式文件系统读取JSON?件。

2 添加Maven依赖

Flink使?JSON和HDFS引?如下依赖:

#引入json格式依赖

<dependency>

?<groupId>org.apache.flink</groupId>

?<artifactId>flink-json</artifactId>

?<version>${flink.version}</version>

</dependency>

#引入日志依赖,进行代码调试

<dependency>

??<groupId>org.slf4j</groupId>

??<artifactId>slf4j-simple</artifactId>

??<version>1.7.15</version>

</dependency>

#引入flink与hadoop兼容性依赖

<dependency>

??<groupId>org.apache.flink</groupId>

??<artifactId>flink-hadoop-compatibility_2.11</artifactId>

??<version>${flink.version}</version>

</dependency>

#引入Hadoop依赖

<dependency>

??<groupId>org.apache.hadoop</groupId>

??<artifactId>hadoop-client</artifactId>

??<version>${hadoop.version}</version>

#排查冲突jar包

??<exclusions>

<exclusion>

??<groupId>org.apache.commons</groupId>

??<artifactId>commons-math3</artifactId>

</exclusion>

<exclusion>

??<groupId>org.apache.commons</groupId>

??<artifactId>commons-compress</artifactId>

</exclusion>

??</exclusions>

</dependency>

3 代码实现

package com.bigdata.chap02;

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.$;

/**

*@author-yangjun

*@contact-wei xin:john_1125

*/

public class FlinkTableAPIFromHDFS {

public static void main(String[] args) {

//1、创建TableEnvironment

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

//2、创建source table

final Schema schema = Schema.newBuilder()

.column("user", DataTypes.STRING())

.column("url", DataTypes.STRING())

.column("cTime", DataTypes.STRING())

.build();

tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("filesystem")

.schema(schema)

.format("json")

.option("path","hdfs://mycluster/data/clicklog/input/click.log")

.build());

//3、创建sink table

tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")

.schema(schema)

.build());

//5、输出(包括执行,不需要单独在调用tEnv.execute("job"))

tEnv.from("sourceTable")

.select($("user"), $("url"),$("cTime"))

.executeInsert("sinkTable");

}

}

4 配置Hadoop环境变量

在Windows本地配置Hadoop环境变量,同时Windows下的Hadoop安装目录中需要包含HDFS集群配置文件hdfs-site.xml和core-site.xml。否则代码无法识别路径hdfs://mycluster。

5 添加log4j文件

在项目的src/main/目录下,创建resources目录并标记为Mark Directory as—>Resources Root,然后将log4j.properties文件拷贝到resources目录中,log4j.properties具体内容如下所示。

log4j.rootLogger=error,CONSOLE,info

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender ????

log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout ????

log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ???

log4j.logger.info=info

log4j.appender.info=org.apache.log4j.DailyRollingFileAppender

log4j.appender.info.layout=org.apache.log4j.PatternLayout ????

log4j.appender.info.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ?

log4j.appender.info.datePattern='.'yyyy-MM-dd

log4j.appender.info.Threshold = info ??

log4j.appender.info.append=true

log4j.appender.info.File=G:/tmp/log/info.log

log4j.logger.error=error ?

log4j.appender.error=org.apache.log4j.DailyRollingFileAppender

log4j.appender.error.layout=org.apache.log4j.PatternLayout ????

log4j.appender.error.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ?

log4j.appender.error.datePattern='.'yyyy-MM-dd

log4j.appender.error.Threshold = error ??

log4j.appender.error.append=true

log4j.appender.error.File=G:/tmp/log/error.log

log4j.logger.DEBUG=DEBUG

log4j.appender.DEBUG=org.apache.log4j.DailyRollingFileAppender

log4j.appender.DEBUG.layout=org.apache.log4j.PatternLayout ????

log4j.appender.DEBUG.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n ?

log4j.appender.DEBUG.datePattern='.'yyyy-MM-dd

log4j.appender.DEBUG.Threshold = DEBUG ??

log4j.appender.DEBUG.append=true

log4j.appender.DEBUG.File=G:/tmp/log/dubug.log

6 准备数据集

准备测试数据集click.log:

{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}

{"user":"Mary","url":"./prod?id=1","cTime":"2022-02-02 12:00:05"}

{"user":"Liz","url":"./home","cTime":"2022-02-02 12:01:00"}

{"user":"Bob","cTime":"2022-02-02 12:01:30"}

{"user":"Mary","url":"./prod?id=7","cTime":"2022-02-02 12:01:45"}

创建hdfs目录

bin/hdfs dfs -mkdir -p /data/clicklog/input

将click.log文件上传至/data/clicklog/input目录

bin/hdfs dfs -put click.log /data/clicklog/input

7 测试运行

在idea工具中,右键项目选择Run运行Flink Table,如果能在控制台看到打印如下结果,说明Flink Table API能成功读取HDFS中的JSON格式数据。

4> +I[Mary, ./home, 2022-02-02 12:00:00]

4> +I[Mary, ./prod?id=1, 2022-02-02 12:00:05]

4> +I[Liz, ./home, 2022-02-02 12:01:00]

4> +I[Bob, null, 2022-02-02 12:01:30]

4> +I[Mary, ./prod?id=7, 2022-02-02 12:01:45]

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-22 18:43:14  更:2022-04-22 18:44:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:54:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码