IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink所有支持的catalog详解 -> 正文阅读

[大数据]flink所有支持的catalog详解

1. 版本说明

本文档介绍的各种flink sql的语法基于flink-1.13.x,flink版本低于1.13.x的用户,在sql运行出错误时,需要自行去flink官网查看对应版本的语法支持。

另外,flink新版本支持的语法,文档中会进行特殊标注,说明对应语法在 flink 哪个版本开始支持,但凡是没有特殊标注的,均支持flink-1.13.x及以上版本。

2. hive catalog

sql

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
-- SQL
USE CATALOG myhive;

下表列出了通过 DDL 定义 HiveCatalog 时所支持的参数。

参数必选默认值类型描述
type(无)StringCatalog 的类型。 创建 HiveCatalog 时,该参数必须设置为 hive
hive-conf-dir(无)String指向包含 hive-site.xml 目录的 URI。 该 URI 必须是 Hadoop 文件系统所支持的类型。 如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找 hive-site.xml。
default-databasedefaultString当一个catalog被设为当前catalog时,所使用的默认当前database。
hive-version(无)StringHiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。
hadoop-conf-dir(无)StringHadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用 HADOOP_CONF_DIR 环境变量来指定 Hadoop 配置。因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。

3. postGres catalog

注意,该特性只支持到 flink-1.14.x ,在 flink-1.15.x 中,将 postgremysql 合并到一起实现了。

JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,PostgresCatalog 是 JDBC Catalog 的唯一实现,PostgresCatalog 只支持有限的 Catalog 方法,包括:

// Postgres Catalog 支持的方法
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)

其他的 Catalog 方法现在还是不支持的。

3.1. PostgresCatalog 的使用

请参阅 Dependencies 部分了解如何配置 JDBC 连接器和 Postgres 驱动。

Postgres catalog 支持以下参数:

  • name:必填,catalog 的名称。
  • default-database:必填,默认要连接的数据库。
  • username:必填,Postgres 账户的用户名。
  • password:必填,账户的密码。
  • base-url:必填,应该符合 jdbc:postgresql://<ip>:<port> 的格式,同时这里不应该包含数据库名。

SQL

CREATE CATALOG mypg WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG mypg;

3.2. PostgresSQL 元空间映射

除了数据库之外,postgresSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 public,每个 schema 可以包含多张表。

在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name 或只写 table_name,其中 schema_name 是可选的,默认值为 public

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:

Flink Catalog 元空间结构Postgres 元空间结构
catalog 名称 (只能在flink中定义)N/A
database 名称database 名称
table 名称[schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义<schema.table>,也就是使用转义字符 ` 将其括起来。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

4. jdbc catalog

从 flink-1.15.x 开始支持,并且将 postgre 和 mysql 合并到了一起实现。

JdbcCatalog 允许用户使用 flink 通过 JDBC 协议去连接关系型数据库。

目前已经有两个 JDBC catalog 的实现,Postgres CatalogMySQL Catalog 。他们支持下面的 catalog 函数,其他函数目前还不支持。

// Postgres 和 MySQL Catalog 支持的函数
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);

其他的 catalog 函数目前还不支持。

4.1. 使用 JDBC catalog

该章节描述怎么创建并使用 Postgres Catalog 或 MySQL Catalog 。怎么添加 JDBC 连接器和相关的驱动,请参考上面的依赖章节。

JDBC catalog 支持下面的选项配置:

  • name:必选,catalog的名称。
  • default-database:必选,连快将诶的默认数据库。
  • username:必选,Postgres/MySQL 账户的用户名。
  • password:必选,账户的密码。
  • base-url:必选,不要包含数据库名称。
    • 对于 Postgres Catalog ,该参数应该为:jdbc:postgresql://<ip>:<port>
    • 对于 MySQL Catalog ,该参数应该为:jdbc:mysql://<ip>:<port>

SQL:

CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG my_catalog;

4.2. JDBC Catalog for PostgreSQL

PostgreSQL 元空间映射。

PostgreSQL 基于数据库有一个额外的命名空间作为 schema 。一个 Postgres 示例可以有多个数据库,每个数据可以有多个 schema ,默认的 schema 名为 public ,每个 schema 可以有多张表。

在 flink 中,当查询注册到 Postgres catalog 中的表时,用户可以使用 schema_name.table_name 或者是只使用 table_name 。schema_name 是可选的,默认为 public

在 flink catalog 和 Postgres 之间的元空间映射如下:

Flink Catalog 元空间结构Postgres 元空间结构
catalog name (只能在 flink 中定义)N/A
database namedatabase name
table name[schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义<schema.table>,也就是使用转义字符 ` 将其括起来。

下面是一些访问 Postgres 表的案例:

-- 浏览名为 'public' 的 schema 下的 'test_table' 表,使用默认的 schema,schema 名称可以被省略。
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 浏览名为 'custom_schema' 的 schema 下的 'test_table2' 表,自定义 schema 不能被省略,而且必须和表一起被转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

4.3. JDBC Catalog for MySQL

MySQL 元空间映射。

MySQL 实例中的数据库和注册到 MySQL catalog 中的数据库有相同的映射级别。一个 MySQL 实例可以有多个数据库,每个数据库可以有多张表。

在 flink 中,当查询注册到 MySQL catalog 中的表时,用户可以使用 database.table_name 或者只指定 table_name 。默认的数据库名为创建 MySQL Catalog 时指定的默认数据库。

flink Catalog 和 MySQL Catalog 之间的元空间映射关系如下:

Flink Catalog Metaspace StructureMySQL Metaspace Structure
catalog name (只能在 flink 中定义)N/A
database namedatabase name
table nametable_name

flink 中的 MySQL 表的完全路径应该为:

`<catalog>`.`<db>`.`<table>`

下面是一些访问 MySQL 表的案例:

-- 浏览 'test_table' 表,默认数据库为 'mydb'
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 浏览指定数据库下的 'test_table' 表。
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:22:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:19:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码