IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink整合hive -> 正文阅读

[大数据]flink整合hive

本地集群flinksql客户端

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

介绍

这里和sparksql、hivesql一样,都可以创建表,执行sql语句,这里尝试创建一张关联了Kafka的表,从官网直接拿模板
在这里插入图片描述
在这里插入图片描述
可以看到这边正在运行,并且已经出来了数据
在这里插入图片描述
由于这是一个动态表,在原表直接修改数据,这里经过了SQL变换的数据也会发生相应的改变
在这里插入图片描述

写入到print表

CREATE TABLE print_table (
 clazz STRING,
 c BIGINT
) WITH (
 'connector' = 'print'
)
insert into print_table
select clazz,count(1) as c from student group by clazz

在这里插入图片描述

在task manager中都可以看到任务运行情况
在这里插入图片描述

写入到MySQL表

CREATE TABLE students (
  clazz STRING,
  c BIGINT,
  PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.5.201:3306/student?useUnicode=true&characterEncoding=utf-8',
   'table-name' = 'student1',
   'username'='root',
   'password'='123456'
)
insert into students
select clazz,count(1) as c from student group by clazz

这里也可以设置跳过null值
在这里插入图片描述

这些都只是测试用的,这里都没法加checkpoint,只是测试用一下

问题

但是这里都有一些致命的问题,就是在我们每一次重新打开flinksql客户端的时候,这里所有的表都会消失,因为我们每一次打开,数据表都是存在内存中的(默认是保存在当前会话),没有地方存储它的元数据,导致我们的flink找不到数据的位置,这里我们就尝试着使用flink整合hive,将元数据放在hive中,本质上还是存储在MySQL,但是flink不能整合MySQL,所以要这样过度一下。

整合

三种存储位置

在这里插入图片描述

在这里插入图片描述
1、默认是当前会话
2、存储在关系型数据库,但是当前只支持postgres数据库
3、hive

将元数据放到hive,整合hive

我们其实完全可以直接查看官网找到所需的一切东西:
https://nightlies.apache.org/flink/flink-docs-release-1.11/zh/dev/table/hive/index.html

在这里插入图片描述
里面有我们所需要的各种jar包的依赖,maven的依赖,以及基础的语法如何写,这里我们直接拿来:

1、将整合需要的jar上传到flink的lib目录
flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar
hive-exec-1.2.1.jar
hive-metastore-1.2.1.jar

上传jar 之后需要重新启动yarn-session.sh
yarn application -kill appid
yarn-session.sh -jm 1024m -tm 1096



2、启动hive元数据服务
 nohup hive --service metastore >> metastore.log 2>&1 &

3、如果在sql-client中使用hive的catalog
修改sql-client-defaults.yaml

catalogs: 
  - name: myhive
    type: hive
    hive-conf-dir: /usr/local/soft/hive-1.2.1/conf
    default-database: default

4、在sql-client中使用hive的catalog
sql-client.sh embedded

这里要首先切换元数据的位置
show catalogs;
USE CATALOG myhive;

use databases;
show tables;
 
就可以直接通过flink操作hive了

在flink中创建的表在hive中就可以查看,不能查询数据
hive中的表在flink中可以查询

在这里就可以直接通过命令行来通过flinksql操作hive了,这些也可以在idea中操作,首先也是先导入依赖

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.11.2</version>
</dependency>

官网这里也有直接连接到hive的模板
在这里插入图片描述

package com.shujia.SQL

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object Demo7FlinkOnHive {
  def main(args: Array[String]): Unit = {
    //如果主键为null,自动删除
    val configuration: Configuration = new Configuration()
    configuration.setString("table.exec.sink.not-null-enforcer","drop")
    configuration.setString("table.dynamic-table-options.enabled", "true")

    //flinksql的环境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val bsSettings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner() //使用blink的计划器
      .inStreamingMode() //使用流处理模型
      .build()
    //创建table的环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    bsTableEnv.getConfig.addConfiguration(configuration)

    /**
      * 注册hive的元数据
      * 可以直接读取hive中的表
      * 这没法在本地运行,因为找不到本地的配置文件,除非把配置文件放到idea,或者上传到集群运行
      */
    val name = "myhive"
    val defaultDatabase = "flink"
    val hiveConfDir = "/usr/local/soft/hive-1.2.1/conf"

    val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
    //注册catalog
    bsTableEnv.registerCatalog("myhive", hive)

    // 切换catalog
    bsTableEnv.useCatalog("myhive")

    /**
      * 编写sql,使用hive中的表
      */
    bsTableEnv.executeSql(
      """
        |insert into mysql_clazz_num
        |select clazz,count(1) from student group by clazz;
      """.stripMargin)

  }
}

这里的代码没法在本地运行,因为本地没法使用hive的配置文件,除非将配置文件放到本地,或者可以提交到集群上运行

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-30 15:41:10  更:2021-11-30 15:42:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:50:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码