IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 说说Flink的连接器connector有哪些怎么用? -> 正文阅读

[大数据]说说Flink的连接器connector有哪些怎么用?


标题: ‘说说Flink的连接器connector有哪些,怎么用?’
日期: 2021-07-31 10:26:51
标签: [flink,connector]
分类: 数据仓库

flink作为一个计算引擎,是缺少存储介质的,那么数据从哪儿来,到哪儿去,就需要连接器了,链接各种类型数据库,各种类型组件进行数据的抽取、计算、存储等,下面来看看flink都有哪些connector,怎么使用的?
flink

介绍

看看目前支持的connector:
这是官方给出的:
flink_connector

有些支持数据源,有些不支持数据源,有些支持无边界流式处理,有些不支持,具体看上图。

我们目前市面上用的比较多的数据库,大概是以下几种:

# 支持jdbc
mysql mongodb postgresql oracle db2 sybase sqlserver hive 
# 不支持jdbc
hbase es 文件 消息队列(kafka rabbitmq rocketmq)

使用

kafka

CREATE TABLE MyUserTable (
  -- declare the schema of the table
  `user` BIGINT,
  `message` STRING
) WITH (
  -- declare the external system to connect to
  'connector' = 'kafka',
  'topic' = 'topic_name',
  'scan.startup.mode' = 'earliest-offset', -- 还有可选从最近offset开始消费:latest-offset
  'properties.bootstrap.servers' = 'localhost:9092', --kafka broker连接串
  'format' = 'json'   -- declare a format for this system
)

hbase

注意hbase目前只支持1.4和2.2版本
hbase

-- register the HBase table 'mytable' in Flink SQL
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW<q1 INT>,
 family2 ROW<q2 STRING, q3 BIGINT>,
 family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
);

-- use ROW(...) construction function construct column families and write data into the HBase table.
-- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;

jdbc

jdbc连接需要添加对应的driver到flink lib里
mysql:点这里
postgresql:点这里
oracle:点这里下载ojdbc8.jar
这是常用的,其他的在网上都能搜得到

-- register a MySQL table 'users' in Flink SQL
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users',
   'driver' = 'com.jdbc.mysql.Driver',
   'username' = 'xxx',
   'password' = 'xxx'
);

es

es只能做sink不能做source

CREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);

文件

可以是服务器本地文件,也可以是hdfs文件,区别就是文件路径描述符的区别:

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',           -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'path' = 'hdfs:///path/to/whatever',  -- required: path to a directory
  'format' = '...',                     -- required: file system connector requires to specify a format,
                                        -- Please refer to Table Formats
                                        -- section for more details
  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
                                        -- column value is null/empty string

  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
  -- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
  'sink.shuffle-by-partition.enable' = '...',
  ...
)

另外还有几种特殊的connector:

datagen

datagen会按照字段指定的类型,随机生成对应的数据

CREATE TABLE Orders (
    order_number BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen'
)

print

每一个写入该表的数据,都会标准输出到日志里

CREATE TABLE print_table (
  f0 INT,
  f1 INT,
  f2 STRING,
  f3 DOUBLE
) WITH (
  'connector' = 'print'
);

blackhole

这个connector会吞噬一切数据,往这个表里写的数据都会消失,主要用于测试性能。

CREATE TABLE blackhole_table (
  f0 INT,
  f1 INT,
  f2 STRING,
  f3 DOUBLE
) WITH (
  'connector' = 'blackhole'
);

参考官网链接:
flink connectors

其实每个connector都支持指定类型的format格式方式,下期文章介绍如何指定格式化,可以指定那些格式化。
点个关注呗。
alittleflower


不积跬步无以至千里,不积小流无以成江海。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
程序员写书

喜欢宠物的朋友可以关注:【电巴克宠物Pets】
电巴克宠物
想知道狗狗怕蚊子吗?扫二维码查看,有惊喜。
扫二维码查看

一起学习,一起进步。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-01 14:34:51  更:2021-08-01 14:36:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 9:43:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码