IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> FlinkSQL kafka流式写入hive -> 正文阅读

[大数据]FlinkSQL kafka流式写入hive

public class Kafka2Hive {
    public static void main(String[] args) {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        environment.enableCheckpointing(10000L);
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment, settings);

        String name            = "myhive";
        String defaultDatabase = "mydatabase";
        String hiveConfDir     = "F:\\flink-demo\\src\\main\\resources";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);

        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive");

        // hive 方言
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        // 已创建kafka catalog 以下建表语句不要执行
//        String hive_sql = "CREATE TABLE hive_table (" +
//                "  user_id STRING," +
//                "  order_amount DOUBLE" +
//                ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (" +
//                "  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," +
//                "  'sink.partition-commit.trigger'='partition-time'," +
//                "  'sink.partition-commit.delay'='1 h'," +
//                "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
//                ")";
//
//        tableEnv.executeSql(hive_sql);

        // default 方言
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

//        // 已创建kafka catalog 以下建表语句不要执行
//        String kafka_sql = "CREATE TABLE kafka_table (" +
//                "  user_id STRING," +
//                "  order_amount DOUBLE," +
//                "  log_ts BIGINT," +
//                "  ts AS TO_TIMESTAMP(FROM_UNIXTIME(log_ts / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
//                "  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
//                ") WITH (" +
//                "  'connector' = 'kafka'," +
//                "  'topic' = 'user_behavior'," +
//                "  'properties.bootstrap.servers' = 'hadoop01:9092'," +
//                "  'properties.group.id' = 'testGroup'," +
//                "  'scan.startup.mode' = 'earliest-offset'," +
//                "  'format' = 'csv')";
//
//        tableEnv.executeSql(kafka_sql);

        tableEnv.executeSql("INSERT INTO hive_table " +
                "SELECT user_id, order_amount, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH')" +
                "FROM kafka_table");

    }
}

hive-site.xml 添加以下配置

<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop01:9083</value>
</property>

启动 hive --service metastore

测试数据

1,10,1625886660000
2,3,1625886721000
1,5,1625887380000
1,6,1625887800000
2,30,1625886721000
1,4,1625889989000

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-19 12:07:10  更:2021-08-19 12:09:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:59:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码