IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> FlinkSQL消费Kafka写入Hive表 -> 正文阅读

[大数据]FlinkSQL消费Kafka写入Hive表

环境版本:

hadoop-3.1.0

hive-3.1.2

flink-1.13.2

一、开发

Maven引入依赖项:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hive.version}</version>
    </dependency>
    <!--用于向hdfs写paruqet-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-parquet_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>${flink.version}</version>
    </dependency>

java代码示例:

package teld;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.time.Duration;

/**
 * @Auther: lixz
 * @Date: 2022/10/13/9:38
 * @Description:  有hive依赖冲突问题暂停
 */
public class Kafka2Hive {
    public static void main( String[] args ) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
        /**
         * hive环境
         */
//        System.setProperty("HADOOP_USER_NAME","hdfs");
        String name            = "myhive";
        String defaultDatabase = "test";
        //这里版本号一定要与hive-exec包版本一致,否则报错:NoSuchMethodException: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
        String hive_version = "3.1.2";
        String hiveConfDir     = "/opt/hive-3.1.2/conf";
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,hive_version);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        //接入kafka
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.78.1:9092")
                .setTopics("test4")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        //接入kafka流
        DataStreamSource<String> stream = env.fromSource(source,
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Kafka Source");
        DataStream<MyUser> dataStream = stream.map(new MapFunction<String, MyUser>() {
            @Override
            public MyUser map(String s) throws Exception {
                String[] arr = s.split(",");
                return new MyUser(arr[0], arr[1], Integer.valueOf(arr[2]));
            }
        }).returns(MyUser.class);
        //创建动态表
        tEnv.createTemporaryView("MyUser",dataStream);
        //创建hive表(如果hive中该表不存在会自动在hive上创建,也可以提前在hive中建好该表,flinksql中就无需再执行建表SQL,因为用了hive的catalog,flinksql运行时会找到表)
        tEnv.executeSql("CREATE TABLE IF NOT EXISTS `myhive`.`test`.`useroplog` \n" +
                "(\n" +
                "`ID` STRING,\n" +
                "`NAME` STRING,\n" +
                "`AGE` INT\n" +
                ") \n" +
                "partitioned by(`DAY` STRING)\n" +
                "STORED AS parquet TBLPROPERTIES(\n" +
                "'auto-compaction'='true',\n" +
                "'format' = 'parquet',\n"+
                "'parquet.compression'='GZIP',\n"+
                "'sink.partition-commit.delay'='30 s',\n" +
                "'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
                ")");
        //写hive表
        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tEnv.executeSql("insert into useroplog select *,'2022-10-13' as `DAY` from MyUser");
        //打印
//        tEnv.executeSql("select * from MyUser").print();
        env.execute();
    }
}

如果要输出的hive没有创建,执行任务后会自动创建,我们到hive下看看自定创建出来的表格式是什么样:

CREATE TABLE `useroplog`(
  `id` string, 
  `name` string, 
  `age` int)
PARTITIONED BY ( 
  `day` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://dss0:8020/user/hive/warehouse/test.db/useroplog'
TBLPROPERTIES (
  'auto-compaction'='true', 
  'bucketing_version'='2', 
  'format'='parquet', 
  'parquet.compression'='GZIP', 
  'sink.partition-commit.delay'='1min', 
  'sink.partition-commit.policy.kind'='success-file', 
  'transient_lastDdlTime'='1665629249')

打包代码,注意不要包含依赖避免依赖重读,我们用到的依赖都放到集群上

提交作业:

flink run-application \
-t yarn-application \
-c teld.Kafka2Hive \
-Dyarn.provided.lib.dirs="hdfs://dss0:8020/user/flink/flink-dependency-1.13.2;hdfs://dss0:8020/user/flink/flink-dependency-1.13.2/lib;hdfs://dss0:8020/user/flink/flink-dependency-1.13.2/plugin
s" \-Dyarn.application.name=flink2hivetest \
flink2hivetest-1.0-SNAPSHOT.jar \

提交成功截图:

当我们向kafka发送数据后就会写入到hive中,我们看下hive表生成的文件结构

实时写入时,分区会自动创建;我们来查询下

?

?

二、注意事项?

1、代码中创建的HiveCatalog中要指定hive版本,并且该版本一定要与依赖hive-exec的版本一致

2、集群HDFS上的依赖如下:

3、hive要开启metastore

bin/hive --service metastore >/dev/null 2>&1 &

开启后可以看9083端口是否存在

4、hive-site.xml配置

需要指定metastore uri

<property>
? ? <name>hive.metastore.uris</name>
? ? <value>thrift://192.168.78.12:9083</value>
? </property>
?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-17 12:41:07  更:2022-10-17 12:41:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年3日历 -2025/3/4 7:50:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码