IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 两只松鼠的故事:flink-connector-opengauss -> 正文阅读

[大数据]两只松鼠的故事:flink-connector-opengauss

图片

感谢松鼠会大佬的再三邀请。对我来说这算是一篇命题作文,那么我的答案是什么呢?刚好我也很喜欢另外一个松鼠社区,那么就用两只松鼠来做答案吧,没错,Flink和OpenGauss就是我的答案:

  1. 手把手完成一次OpenGauss的安装

  2. 手把手构建 Flink connector opengauss

OpenGauss 安装

操作系统建议使用?openEuler20.03LTS?,注意不要使用sp包,可能会因为sp包依赖升级,而导致安装失败。使用下面安装脚本,我对官方脚本进行了一些修改,适合做单机实验,不再需要手动设置ip,而是直接将 eth0 的 ip 作为数据库ip使用

#!/bin/bash

## Author:??贾军锋
## update:? dafei1288 @ 2021-09-27
## Date:??? 2021-04-15
## OS:?????? openEuler20.03LTS [最小硬件配置:2c/4G]
## Database:openGauss 2.0.0
## Description:一键式实现操作系统环境配置、openGauss软件下载、openGauss软件安装等步骤,帮助大家提升安装openGauss数据库效率
##?Tips:?????请确保操作系统可以连接外网

##?0.关闭virbr0网卡?[本地虚拟机标准化安装openEuler系统会默认存在virbr0网卡,删除该网卡以避免干扰数据库的安装]
##?virsh?net-destroy?default
##?virsh?net-list
##?echo?"Net?device?virbr0?is?disabled."


##?1.定义主机信息[请根据实际情况修改]
export?MY_HOSTNAME=node1???????????##?主机名
export?MY_HOSTIP=$(ifconfig?eth0?|?grep?'inet'?|?awk?'{print?$2}'?|?head?-1)?????##?IP地址
export?MY_SOFTWARE_DIRECTORY=/soft/openGauss??????##?软件包所在目录
export?MY_XML=/soft/openGauss/clusterconfig.xml???##?集群配置文件XML
export?openGauss_Download_url=https://opengauss.obs.cn-south-1.myhuaweicloud.com/2.0.0/x86_openEuler/openGauss-2.0.0-openEuler-64bit-all.tar.gz??##?openGauss软件包下载地址

##?1.?设置主机名并配置hosts文件
hostnamectl?set-hostname?$MY_HOSTNAME
sed?-i?'/$MY_HOSTIP/d'?/etc/hosts
echo?"$MY_HOSTIP??$MY_HOSTNAME???#Gauss?OM?IP?Hosts?Mapping"?>>?/etc/hosts
cat?/etc/hosts
echo?"1.Configure?/etc/hosts?completed."
echo?-e?"\n"

##?2.?关闭防火墙
systemctl?disable?firewalld.service
systemctl?stop?firewalld.service
echo?"Firewalld?"?`systemctl?status?firewalld|grep?Active`
echo?"2.Disable?firewalld?service?completed."
echo?-e?"\n"

##?3.?关闭SELinux
sed?-i?'/^SELINUX=/d'?/etc/selinux/config
echo?"SELINUX=disabled"?>>?/etc/selinux/config
cat?/etc/selinux/config|grep?"SELINUX=disabled"
echo?"3.Disable?SELINUX?completed."
echo?-e?"\n"


##?4.?设置操作系统字符集编码
echo?"LANG=en_US.UTF-8"?>>?/etc/profile
source?/etc/profile
echo?$LANG
echo?"4.Configure?encoding?completed."
echo?-e?"\n"

##?5.?设置操作系统时区
rm?-fr?/etc/localtime
ln?-s?/usr/share/zoneinfo/Asia/Shanghai??/etc/localtime
date?-R
hwclock
echo?"5.Configure?Timezone?completed."
echo?-e?"\n"

##?6.?关闭SWAP分区?[对于2G内存的设备,建议待安装完毕后再打开SWAP以间接?“扩容内存容量”]
sed?-i?'/swap/s/^/#/'?/etc/fstab
swapoff?-a
free?-m
echo?"6.Close?swap?partition?completed."
echo?-e?"\n"


##?7.?配置SSH服务,关闭Banner,允许root远程登录
sed?-i?'/Banner/s/^/#/'??/etc/ssh/sshd_config
sed?-i?'/PermitRootLogin/s/^/#/'??/etc/ssh/sshd_config
echo?-e?"\n"?>>?/etc/ssh/sshd_config
echo?"Banner?none?"?>>?/etc/ssh/sshd_config
echo?"PermitRootLogin?yes"?>>?/etc/ssh/sshd_config
cat?/etc/ssh/sshd_config?|grep?-v?^#|grep?-E?'PermitRoot|Banner'
echo?"7.Configure?SSH?Service?completed."
echo?-e?"\n"

##?8.?配置YUM源、安装依赖包、修改默认Python3版本
mkdir?/etc/yum.repos.d/bak
mv?/etc/yum.repos.d/*.repo??/etc/yum.repos.d/bak/
wget?-O?/etc/yum.repos.d/openEulerOS.repo?https://repo.huaweicloud.com/repository/conf/openeuler_x86_64.repo
yum?clean?all
yum?install?-y?bzip2?python3
yum?install?-y?libaio-devel?libnsl?flex?bison?ncurses-devel?glibc-devel?patch?readline-devel?net-tools?tar
mv?/usr/bin/python??/usr/bin/python2_bak
ln?-s?/usr/bin/python3?/usr/bin/python
python?-V
echo?"8.Configure?Install?Packages?and?change?default?Python?version?completed."
echo?-e?"\n"


##?9.?配置?sysctl.conf?和?performance.sh
cat?>>?/etc/sysctl.conf?<<?EOF
net.ipv4.tcp_retries1?=?5
net.ipv4.tcp_syn_retries?=?5
net.sctp.path_max_retrans?=?10
net.sctp.max_init_retransmits?=?10
EOF
sysctl?-p

sed?-i?'/vm.min_free_kbytes/s/^/#/'?/etc/profile.d/performance.sh???##?Only?for?openEuler
cat?/etc/profile.d/performance.sh|grep?vm.min_free_kbytes

echo?"9.Configure?sysctl.conf?and?performance.sh?completed."
echo?-e?"\n"


##?10.?配置资源限制
echo?"*?soft?stack?3072"?>>?/etc/security/limits.conf
echo?"*?hard?stack?3072"?>>?/etc/security/limits.conf
echo?"*?soft?nofile?1000000"?>>?/etc/security/limits.conf
echo?"*?hard?nofile?1000000"?>>?/etc/security/limits.conf
echo?"*?soft?nproc?unlimited"?>>?/etc/security/limits.d/90-nproc.conf
tail?-n?4?/etc/security/limits.conf
tail?-n?1?/etc/security/limits.d/90-nproc.conf
echo?"10.Configure?resource?limits?completed."
echo?-e?"\n"

##?11.?关闭透明大页[Only?for?CentOS]
cat?>>/etc/rc.d/rc.local<<EOF
if?test?-f?/sys/kernel/mm/transparent_hugepage/enabled;?then
???echo?never?>?/sys/kernel/mm/transparent_hugepage/enabled
fi
if?test?-f?/sys/kernel/mm/transparent_hugepage/defrag;?then
???echo?never?>?/sys/kernel/mm/transparent_hugepage/defrag
fi
EOF
chmod?+x?/etc/rc.d/rc.local
/usr/bin/sh?/etc/rc.d/rc.local
cat?/sys/kernel/mm/transparent_hugepage/enabled
cat?/sys/kernel/mm/transparent_hugepage/defrag
echo?"11.Close?transparent_hugepage?completed."
echo?-e?"\n"


##?12.?禁用RemoveIPC[Only?for?openEuler]
sed?-i?'/^RemoveIPC/d'?/etc/systemd/logind.conf
sed?-i?'/^RemoveIPC/d'?/usr/lib/systemd/system/systemd-logind.service
echo?"RemoveIPC=no"??>>?/etc/systemd/logind.conf
echo?"RemoveIPC=no"??>>?/usr/lib/systemd/system/systemd-logind.service
systemctl?daemon-reload
systemctl?restart?systemd-logind
loginctl?show-session?|?grep?RemoveIPC
systemctl?show?systemd-logind?|?grep?RemoveIPC
echo?"12.Disable?RemoveIPC?completed."
echo?-e?"\n"


##?13.?下载openGauss软件包
mkdir?-p?$MY_SOFTWARE_DIRECTORY
cd?$MY_SOFTWARE_DIRECTORY
wget?$openGauss_Download_url
echo?"13.openGauss?software?download?completed."
echo?-e?"\n"

##?14.?配置XML文件
rm?-fr?$MY_XML
cat?>>?$MY_XML?<<EOF
<?xml?version="1.0"?encoding="UTF-8"?>?
<ROOT>?
????<!--?openGauss整体信息?-->?
????<CLUSTER>?
????????<PARAM?name="clusterName"?value="dbCluster"?/>?
????????<PARAM?name="nodeNames"?value="$MY_HOSTNAME"?/>?
????????<PARAM?name="backIp1s"?value="$MY_HOSTIP"/>?
????????<PARAM?name="gaussdbAppPath"?value="/gaussdb/app"?/>?
????????<PARAM?name="gaussdbLogPath"?value="/gaussdb/log"?/>?
????????<PARAM?name="gaussdbToolPath"?value="/gaussdb/om"?/>?
????????<PARAM?name="corePath"?value="/gaussdb/corefile"/>?
????????<PARAM?name="clusterType"?value="single-inst"/>?
????</CLUSTER>?
????<!--?每台服务器上的节点部署信息?-->?
????<DEVICELIST>?
????????<!--?node1上的节点部署信息?-->?
????????<DEVICE?sn="1000001">?
????????????<PARAM?name="name"?value="$MY_HOSTNAME"/>?
????????????<PARAM?name="azName"?value="AZ1"/>?
????????????<PARAM?name="azPriority"?value="1"/>?
????????????<!--?如果服务器只有一个网卡可用,将backIP1和sshIP1配置成同一个IP?-->?
????????????<PARAM?name="backIp1"?value="$MY_HOSTIP"/>?
????????????<PARAM?name="sshIp1"?value="$MY_HOSTIP"/>?
?????????????
?????<!--dbnode-->?
?????<PARAM?name="dataNum"?value="1"/>?
?????<PARAM?name="dataPortBase"?value="26000"/>?
?????<PARAM?name="dataNode1"?value="/gaussdb/data/db1"/>?
????????</DEVICE>?
????</DEVICELIST>?
</ROOT>
EOF
cat?$MY_XML
echo?"14.Configure?XML?file?completed."
echo?-e?"\n"


##?15.?解压安装包并修改目录权限
echo?"Begin?to?Uncompress?openGauss?Package?and?Modify?directory?permissions:"
cd?$MY_SOFTWARE_DIRECTORY
tar?-zxvf?*all.tar.gz
tar?-zxvf?*om.tar.gz
ls?-l
chmod?-R?777?$MY_SOFTWARE_DIRECTORY/../
echo?"15.Uncompress?openGauss?Package?completed."
echo?-e?"\n"

##?16.?执行?gs_preinstall
echo?"Begin?to?execute?openGauss?preinstall:"
python?$MY_SOFTWARE_DIRECTORY/script/gs_preinstall?-U?omm?-G?dbgrp?-X?$MY_XML
echo?"16.openGauss?preinstall?completed."
echo?-e?"\n"


##?17.?检查预安装环境
echo?"Begin?to?Check?OS?environment:"
$MY_SOFTWARE_DIRECTORY/script/gs_checkos?-i?A?-h?$MY_HOSTNAME?--detail

##?18.?执行?gs_install
echo?"Begin?to?execute?openGauss?install:"
touch?/home/omm/install_db
cat?>>?/home/omm/install_db?<<EOF
source?~/.bashrc
gs_install?-X??$MY_XML?--gsinit-parameter="--encoding=UTF8"??--dn-guc="max_process_memory=2GB"?--dn-guc="shared_buffers=128MB"?--dn-guc="cstore_buffers=16MB"
EOF
chown?-R?omm:dbgrp?/home/omm/install_db
su?-?omm?-c?"sh?/home/omm/install_db"
echo?"17.openGauss?install?completed."
echo?-e?"\n"

##?安装完毕!
echo?"openGauss?Install?completed.congratulations"
echo?"Congratulations!!!"

?

另外,如果你也是虚拟机安装,注意至少分配2核,4G内存(上图是我的虚拟机配置,仅供参考),否则会出现如下错误:

[GAUSS-51400]?:?Failed?to?execute?the?command:?source?/home/omm/.bashrc;python3?'/gaussdb/om/script/local/InitInstance.py'?-U?omm??-P?"--encoding=UTF8"?-l?/gaussdb/log/omm/om/gs_local.log.Error:
Initializing?instance.
[GAUSS-51615]?:?Failed?to?initialize?instance.?Command:/gaussdb/app/bin/gs_initdb?--locale=C?-D?/gaussdb/data/db1?--nodename=dn_6001?--encoding=UTF8?-C?/gaussdb/app/bin.?Error:
The?files?belonging?to?this?database?system?will?be?owned?by?user?"omm".
This?user?must?also?own?the?server?

OpenGauss 常用命令

切换至数据库用户

su - omm

查看数据库状态

gs_om -t status --all

[omm@node1?~]$?gs_om?-t?status?--all
-----------------------------------------------------------------------

cluster_state?????????????:?Normal
redistributing????????????:?No

-----------------------------------------------------------------------

node??????????????????????:?1
node_name?????????????????:?node1
instance_id???????????????:?6001
node_ip???????????????????:?172.21.235.7
data_path?????????????????:?/gaussdb/data/db1
type??????????????????????:?Datanode
instance_state????????????:?Normal
az_name???????????????????:?AZ1
instance_role?????????????:?Normal

-----------------------------------------------------------------------

重启服务命令

gs_om -t restart

[omm@node1?~]$??gs_om?-t?restart
Stopping?cluster.
=========================================
Successfully?stopped?cluster.
=========================================
End?stop?cluster.
Starting?cluster.
=========================================
[SUCCESS]?node1
2021-09-27?15:53:05.285?61517861.1?[unknown]?140634486265600?[unknown]?0?dn_6001?01000??0?[BACKEND]?WARNING:??could?not?create?any?HA?TCP/IP?sockets
2021-09-27?15:53:05.300?61517861.1?[unknown]?140634486265600?[unknown]?0?dn_6001?01000??0?[BACKEND]?WARNING:??Failed?to?initialize?the?memory?protect?for?g_instance.attr.attr_storage.cstore_buffers?(16?Mbytes)?or?shared?memory?(1496?Mbytes)?is?larger.
=========================================
Successfully?started.

连接数据库

gsql -d postgress -p 26000 -r

[omm@node1?~]$?gsql?-d?postgres?-p?26000?-r
gsql?((openGauss?2.0.0?build?78689da9)?compiled?at?2021-03-31?21:04:06?commit?0?last?mr??)
Non-SSL?connection?(SSL?connection?is?recommended?when?requiring?high-security)
Type?"help"?for?help.

postgres=#

创建用户

postgres=#?create?user?jacky?with?password?'123456';
CREATE?ROLE

创建数据库

postgres=#?CREATE?DATABASE?test?OWNER?jacky;

授权

postgres=#?GRANT?ALL?PRIVILEGES?ON?DATABASE?test?TO?jacky;

退出命令行

postgres=#?\q

配置 navicat 连接 OpenGauss

这里配置稍微有一点繁琐,需要开启用户的md5验证,然后才能使用navicat进行管理。开启md5验证需要如下步骤:

首先:vi /gaussdb/data/db1/pg_hba.conf

#?TYPE??DATABASE????????USER????????????ADDRESS?????????????????METHOD

#?"local"?is?for?Unix?domain?socket?connections?only
local???all?????????????all?????????????????????????????????????trust
#?IPv4?local?connections:
host????all?????????????all?????????????127.0.0.1/32????????????trust
host????all????all????172.27.71.161/32????trust
host????all?????all?????172.27.64.1/32??????????md5
#?IPv6?local?connections:
host????all?????????????all?????????????::1/128?????????????????trust
#?Allow?replication?connections?from?localhost,?by?a?user?with?the
#?replication?privilege.
#local???replication?????omm????????????????????????????????trust
#host????replication?????omm????????127.0.0.1/32????????????trust
#host????replication?????omm????????::1/128?????????????????trust

添加一行数据 ,其中 ?172.27.64.1 ?替换为,你的客户端IP

host ? ?all ? ? all ? ? 172.27.64.1/32 ? ? ? ? ?md5

然后?vi /gaussdb/data/db1/postgresql.conf

modify_initial_password?=?true??#Whether?to?change?the?initial?password?of?the?initial?user
#password_policy?=?1????????????????????#Whether?password?complexity?checks
#password_reuse_time?=?60???????????????#Whether?the?new?password?can?be?reused?in?password_reuse_time?days
#password_reuse_max?=?0?????????????????#Whether?the?new?password?can?be?reused
#password_lock_time?=?1?????????????????#The?account?will?be?unlocked?automatically?after?a?specified?period?of?time
#failed_login_attempts?=?10?????????????#Enter?the?wrong?password?reached?failed_login_attempts?times,?the?current?account?will?be?locked
password_encryption_type?=?0????????????#Password?storage?type,?0?is?md5?for?PG,?1?is?sha256?+?md5,?2?is?sha256?only
#password_min_length?=?8????????????????#The?minimal?password?length(6-999)
#password_max_length?=?32???????????????#The?maximal?password?length(6-999)
#password_min_uppercase?=?0?????????????#The?minimal?upper?character?number?in?password(0-999)
#password_min_lowercase?=?0?????????????#The?minimal?lower?character?number?in?password(0-999)
#password_min_digital?=?0???????????????#The?minimal?digital?character?number?in?password(0-999)
#password_min_special?=?0???????????????#The?minimal?special?character?number?in?password(0-999)
#password_effect_time?=?90d?????????????#The?password?effect?time(0-999)
#password_notify_time?=?7d??????????????#The?password?notify?time(0-999)

找到?password_encryption_type?修改为?0

重启服务即可

?

Flink connector opengauss

我们其实是可以自己手写Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到opengauss呢?答案是肯定的,接下来我们就来实现一个简单的opengauss的Flink connector

  1. 构建 行转换器(RowConverter)

  2. 构建 方言(Dialect)

  3. 注册动态表工厂(DynamicTableFactory),以及相关Sink程序

经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:

构建 行转换器(RowConverter)

package?name.lijiaqi.converter;

import?org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import?org.apache.flink.table.types.logical.RowType;

/**
?*?@author?lijiaqi
?*/
public?class?OpenGaussRowConverter?extends?AbstractJdbcRowConverter?{

????public?OpenGaussRowConverter(RowType?rowType)?{
????????super(rowType);
????}

????private?static?final?long?serialVersionUID?=?1L;

????@Override
????public?String?converterName()?{
????????return?"opengauss";
????}

}

构建 方言(Dialect)

package?name.lijiaqi.dialect;

import?name.lijiaqi.converter.OpenGaussRowConverter;
import?org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import?org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import?org.apache.flink.table.api.TableSchema;
import?org.apache.flink.table.api.ValidationException;
import?org.apache.flink.table.types.logical.RowType;

import?java.util.Optional;

/**
?*
?*?@author?lijiaqi
?*/
public?class?OpenGaussDialect?implements?JdbcDialect?{

????private?static?final?long?serialVersionUID?=?1L;

????@Override
????public?String?dialectName()?{
????????return?"opengauss";
????}

????@Override
????public?boolean?canHandle(String?url)?{
????????return?url.startsWith("jdbc:opengauss:");
????}

????@Override
????public?JdbcRowConverter?getRowConverter(RowType?rowType)?{
????????return?new?OpenGaussRowConverter(rowType);
????}

????@Override
????public?String?getLimitClause(long?l)?{
????????return?null;
????}

????@Override
????public?void?validate(TableSchema?schema)?throws?ValidationException?{
????????JdbcDialect.super.validate(schema);
????}

????@Override
????public?Optional<String>?defaultDriverName()?{
????????return?Optional.of("org.opengauss.Driver");
????}

????@Override
????public?String?quoteIdentifier(String?identifier)?{
????????return?"'"?+?identifier?+?"'";
????}

????@Override
????public?Optional<String>?getUpsertStatement(String?tableName,?String[]?fieldNames,?String[]?uniqueKeyFields)?{
????????return?JdbcDialect.super.getUpsertStatement(tableName,?fieldNames,?uniqueKeyFields);
????}

????@Override
????public?String?getRowExistsStatement(String?tableName,?String[]?conditionFields)?{
????????return?JdbcDialect.super.getRowExistsStatement(tableName,?conditionFields);
????}

????@Override
????public?String?getInsertIntoStatement(String?tableName,?String[]?fieldNames)?{
????????return?JdbcDialect.super.getInsertIntoStatement(tableName,?fieldNames);
????}

????@Override
????public?String?getUpdateStatement(String?tableName,?String[]?fieldNames,?String[]?conditionFields)?{
????????return?JdbcDialect.super.getUpdateStatement(tableName,?fieldNames,?conditionFields);
????}

????@Override
????public?String?getDeleteStatement(String?tableName,?String[]?conditionFields)?{
????????return?JdbcDialect.super.getDeleteStatement(tableName,?conditionFields);
????}

????@Override
????public?String?getSelectFromStatement(String?tableName,?String[]?selectFields,?String[]?conditionFields)?{
????????return?JdbcDialect.super.getSelectFromStatement(tableName,?selectFields,?conditionFields);
????}

}

注册动态表工厂(DynamicTableFactory),以及相关Sink程序

首先创建?OpenGaussSinkFunction?用于接受RowData数据输入,并将其Sink到配置的数据库中

package?name.lijiaqi.table;

import?org.apache.flink.api.common.serialization.SerializationSchema;
import?org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import?org.apache.flink.table.connector.ChangelogMode;
import?org.apache.flink.table.connector.format.EncodingFormat;
import?org.apache.flink.table.connector.sink.DynamicTableSink;
import?org.apache.flink.table.connector.sink.SinkFunctionProvider;
import?org.apache.flink.table.data.RowData;
import?org.apache.flink.table.types.DataType;

/**
?*?@author?lijiaqi
?*/
public?class?OpenGaussDynamicTableSink?implements?DynamicTableSink?{

????private?final?JdbcOptions?jdbcOptions;
????private?final?EncodingFormat<SerializationSchema<RowData>>?encodingFormat;
????private?final?DataType?dataType;

????public?OpenGaussDynamicTableSink(JdbcOptions?jdbcOptions,?EncodingFormat<SerializationSchema<RowData>>?encodingFormat,?DataType?dataType)?{
????????this.jdbcOptions?=?jdbcOptions;
????????this.encodingFormat?=?encodingFormat;
????????this.dataType?=?dataType;
????}

????@Override
????public?ChangelogMode?getChangelogMode(ChangelogMode?requestedMode)?{
????????return?requestedMode;
????}

????@Override
????public?SinkRuntimeProvider?getSinkRuntimeProvider(Context?context)?{
????????System.out.println("SinkRuntimeProvider");
????????System.out.println(dataType);

//????????SerializationSchema<RowData>?serializationSchema?=?encodingFormat.createRuntimeEncoder(context,?dataType);
????????OpenGaussSinkFunction?gbasedbtSinkFunction?=?new?OpenGaussSinkFunction(jdbcOptions,dataType);
????????return?SinkFunctionProvider.of(gbasedbtSinkFunction);
????}

????@Override
????public?DynamicTableSink?copy()?{
????????return?new?OpenGaussDynamicTableSink(jdbcOptions,?encodingFormat,?dataType);
????}

????@Override
????public?String?asSummaryString()?{
????????return?"OpenGauss?Table?Sink";
????}

}

构建 OpenGaussDynamicTableSink

package?name.lijiaqi.table;

import?org.apache.flink.api.common.serialization.SerializationSchema;
import?org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import?org.apache.flink.table.connector.ChangelogMode;
import?org.apache.flink.table.connector.format.EncodingFormat;
import?org.apache.flink.table.connector.sink.DynamicTableSink;
import?org.apache.flink.table.connector.sink.SinkFunctionProvider;
import?org.apache.flink.table.data.RowData;
import?org.apache.flink.table.types.DataType;

/**
?*?@author?lijiaqi
?*/
public?class?OpenGaussDynamicTableSink?implements?DynamicTableSink?{

????private?final?JdbcOptions?jdbcOptions;
????private?final?EncodingFormat<SerializationSchema<RowData>>?encodingFormat;
????private?final?DataType?dataType;

????public?OpenGaussDynamicTableSink(JdbcOptions?jdbcOptions,?EncodingFormat<SerializationSchema<RowData>>?encodingFormat,?DataType?dataType)?{
????????this.jdbcOptions?=?jdbcOptions;
????????this.encodingFormat?=?encodingFormat;
????????this.dataType?=?dataType;
????}

????@Override
????public?ChangelogMode?getChangelogMode(ChangelogMode?requestedMode)?{
????????return?requestedMode;
????}

????@Override
????public?SinkRuntimeProvider?getSinkRuntimeProvider(Context?context)?{
????????System.out.println("SinkRuntimeProvider");
????????System.out.println(dataType);

//????????SerializationSchema<RowData>?serializationSchema?=?encodingFormat.createRuntimeEncoder(context,?dataType);
????????OpenGaussSinkFunction?gbasedbtSinkFunction?=?new?OpenGaussSinkFunction(jdbcOptions,dataType);
????????return?SinkFunctionProvider.of(gbasedbtSinkFunction);
????}

????@Override
????public?DynamicTableSink?copy()?{
????????return?new?OpenGaussDynamicTableSink(jdbcOptions,?encodingFormat,?dataType);
????}

????@Override
????public?String?asSummaryString()?{
????????return?"OpenGauss?Table?Sink";
????}

}

构建OpenGaussDynamicTableFactory

package?name.lijiaqi.table;


import?name.lijiaqi.dialect.OpenGaussDialect;
import?org.apache.flink.configuration.ConfigOption;
import?org.apache.flink.configuration.ConfigOptions;
import?org.apache.flink.configuration.ReadableConfig;
import?org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import?org.apache.flink.table.api.TableSchema;
import?org.apache.flink.table.connector.sink.DynamicTableSink;
import?org.apache.flink.table.connector.source.DynamicTableSource;
import?org.apache.flink.table.factories.DynamicTableSinkFactory;
import?org.apache.flink.table.factories.DynamicTableSourceFactory;
import?org.apache.flink.table.factories.FactoryUtil;
import?org.apache.flink.table.types.DataType;
import?org.apache.flink.table.utils.TableSchemaUtils;

import?java.util.HashSet;
import?java.util.Set;

/**
?*?@author?lijiaqi
?*/
public?class?OpenGaussDynamicTableFactory?implements?DynamicTableSourceFactory,?DynamicTableSinkFactory?{

????public?static?final?String?IDENTIFIER?=?"opengauss";

????private?static?final?String?DRIVER_NAME?=?"org.opengauss.Driver";

????public?static?final?ConfigOption<String>?URL?=?ConfigOptions
????????????.key("url")
????????????.stringType()
????????????.noDefaultValue()
????????????.withDescription("the?jdbc?database?url.");

????public?static?final?ConfigOption<String>?DRIVER?=?ConfigOptions
????????????.key("driver")
????????????.stringType()
????????????.defaultValue(DRIVER_NAME)
????????????.withDescription("the?jdbc?driver.");



????public?static?final?ConfigOption<String>?TABLE_NAME?=?ConfigOptions
????????????.key("table-name")
????????????.stringType()
????????????.noDefaultValue()
????????????.withDescription("the?jdbc?table?name.");

????public?static?final?ConfigOption<String>?USERNAME?=?ConfigOptions
????????????.key("username")
????????????.stringType()
????????????.noDefaultValue()
????????????.withDescription("the?jdbc?user?name.");

????public?static?final?ConfigOption<String>?PASSWORD?=?ConfigOptions
????????????.key("password")
????????????.stringType()
????????????.noDefaultValue()
????????????.withDescription("the?jdbc?password.");

//????public?static?final?ConfigOption<String>?FORMAT?=?ConfigOptions
//????????????.key("format")
//????????????.stringType()
//????????????.noDefaultValue()
//????????????.withDescription("the?format.");

????@Override
????public?String?factoryIdentifier()?{
????????return?IDENTIFIER;
????}

????@Override
????public?Set<ConfigOption<?>>?requiredOptions()?{
????????Set<ConfigOption<?>>?requiredOptions?=?new?HashSet<>();
????????requiredOptions.add(URL);
????????requiredOptions.add(TABLE_NAME);
????????requiredOptions.add(USERNAME);
????????requiredOptions.add(PASSWORD);
//????????requiredOptions.add(FORMAT);
????????return?requiredOptions;
????}

????@Override
????public?Set<ConfigOption<?>>?optionalOptions()?{
????????return?new?HashSet<>();
????}

????@Override
????public?DynamicTableSource?createDynamicTableSource(Context?context)?{

????????final?FactoryUtil.TableFactoryHelper?helper?=?FactoryUtil.createTableFactoryHelper(this,?context);

????????final?ReadableConfig?config?=?helper.getOptions();

????????helper.validate();

????????JdbcOptions?jdbcOptions?=?getJdbcOptions(config);

????????TableSchema?physicalSchema?=?TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

????????return?new?OpenGaussDynamicTableSource(jdbcOptions,?physicalSchema);

????}

????@Override
????public?DynamicTableSink?createDynamicTableSink(Context?context)?{

????????final?FactoryUtil.TableFactoryHelper?helper?=?FactoryUtil.createTableFactoryHelper(this,?context);

//????????final?EncodingFormat<SerializationSchema<RowData>>?encodingFormat?=?helper.discoverEncodingFormat(
//????????????????SerializationFormatFactory.class,
//????????????????FactoryUtil.FORMAT);

????????final?ReadableConfig?config?=?helper.getOptions();

????????//?validate?all?options
????????helper.validate();

????????//?get?the?validated?options
????????JdbcOptions?jdbcOptions?=?getJdbcOptions(config);

????????//?derive?the?produced?data?type?(excluding?computed?columns)?from?the?catalog?table
????????final?DataType?dataType?=?context.getCatalogTable().getSchema().toPhysicalRowDataType();

????????//?table?sink
????????return?new?OpenGaussDynamicTableSink(jdbcOptions,?null,?dataType);
????}

????private?JdbcOptions?getJdbcOptions(ReadableConfig?readableConfig)?{
????????final?String?url?=?readableConfig.get(URL);
????????final?JdbcOptions.Builder?builder?=?JdbcOptions.builder()
????????????????.setDriverName(DRIVER_NAME)
????????????????.setDBUrl(url)
????????????????.setTableName(readableConfig.get(TABLE_NAME))
????????????????.setDialect(new?OpenGaussDialect());

????????readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
????????readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
????????return?builder.build();
????}

}

接下来通过SPI注册动态表:创建文件resources\META-INF\services\org.apache.flink.table.factories.Factory内容注册为name.lijiaqi.table.OpenGaussDynamicTableFactory

至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。

CDC实战

下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 opengauss里

?

接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。

创建数据源表

???????//?数据源表
???????String?sourceDDL?=
???????????????"CREATE?TABLE?mysql_binlog?(\n"?+
???????????????????????"?id?INT?NOT?NULL,\n"?+
???????????????????????"?name?STRING,\n"?+
???????????????????????"?description?STRING\n"?+
???????????????????????")?WITH?(\n"?+
???????????????????????"?'connector'?=?'mysql-cdc',\n"?+
???????????????????????"?'hostname'?=?'localhost',\n"?+
???????????????????????"?'port'?=?'3306',\n"?+
???????????????????????"?'username'?=?'flinkcdc',\n"?+
???????????????????????"?'password'?=?'123456',\n"?+
???????????????????????"?'database-name'?=?'test',\n"?+
???????????????????????"?'table-name'?=?'test_cdc'\n"?+
???????????????????????")";

创建输出表,输出到opengauss ,这里 connector设置成opengauss

????????String?url?=?"jdbc:opengauss://172.27.71.161:26000/postgres";
????????String?userName?=?"jacky";
????????String?password?=?"123456";
????????String?gbasedbtSinkTable?=?"t1";
????????//?输出目标表
????????String?sinkDDL?=
????????????????"CREATE?TABLE?test_cdc_sink?(\n"?+
????????????????????????"?id?INT?NOT?NULL,\n"?+
????????????????????????"?name?STRING,\n"?+
????????????????????????"?description?STRING,\n"?+
????????????????????????"?PRIMARY?KEY?(id)?NOT?ENFORCED?\n?"?+
????????????????????????")?WITH?(\n"?+
????????????????????????"?'connector'?=?'opengauss',\n"?+
//???????????????????????"?'driver'?=?'com.gbasedbt.jdbc.Driver',\n"?+
????????????????????????"?'url'?=?'"?+?url?+?"',\n"?+
????????????????????????"?'username'?=?'"?+?userName?+?"',\n"?+
????????????????????????"?'password'?=?'"?+?password?+?"',\n"?+
????????????????????????"?'table-name'?=?'"?+?gbasedbtSinkTable?+?"'?\n"?+
????????????????????????")";

这里我们直接将数据汇入

???????String?transformSQL?=
???????????????"insert?into?test_cdc_sink?select?*?from?mysql_binlog";

完整参考代码

package?name.lijiaqi;

import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.table.api.EnvironmentSettings;
import?org.apache.flink.table.api.SqlDialect;
import?org.apache.flink.table.api.TableResult;
import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public?class?MysqlToOpenGaussMain?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance()
????????????????.useBlinkPlanner()
????????????????.inStreamingMode()
????????????????.build();
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env,?fsSettings);



????????tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


????????//?数据源表
????????String?sourceDDL?=
????????????????"CREATE?TABLE?mysql_binlog?(\n"?+
????????????????????????"?id?INT?NOT?NULL,\n"?+
????????????????????????"?name?STRING,\n"?+
????????????????????????"?description?STRING,\n"?+
????????????????????????"?PRIMARY?KEY?(id)?NOT?ENFORCED"?+
????????????????????????")?WITH?(\n"?+
????????????????????????"?'connector'?=?'mysql-cdc',\n"?+
????????????????????????"?'hostname'?=?'localhost',\n"?+
????????????????????????"?'port'?=?'3306',\n"?+
????????????????????????"?'username'?=?'flinkcdc',\n"?+
????????????????????????"?'password'?=?'123456',\n"?+
????????????????????????"?'database-name'?=?'test',\n"?+
????????????????????????"?'table-name'?=?'test_cdc'\n"?+
????????????????????????")";


????????String?url?=?"jdbc:opengauss://172.27.71.161:26000/postgres";
????????String?userName?=?"jacky";
????????String?password?=?"123456";
????????String?gbasedbtSinkTable?=?"t1";
????????//?输出目标表
????????String?sinkDDL?=
????????????????"CREATE?TABLE?test_cdc_sink?(\n"?+
????????????????????????"?id?INT?NOT?NULL,\n"?+
????????????????????????"?name?STRING,\n"?+
????????????????????????"?description?STRING,\n"?+
????????????????????????"?PRIMARY?KEY?(id)?NOT?ENFORCED?\n?"?+
????????????????????????")?WITH?(\n"?+
????????????????????????"?'connector'?=?'opengauss',\n"?+
//???????????????????????"?'driver'?=?'com.gbasedbt.jdbc.Driver',\n"?+
????????????????????????"?'url'?=?'"?+?url?+?"',\n"?+
????????????????????????"?'username'?=?'"?+?userName?+?"',\n"?+
????????????????????????"?'password'?=?'"?+?password?+?"',\n"?+
????????????????????????"?'table-name'?=?'"?+?gbasedbtSinkTable?+?"'?\n"?+
????????????????????????")";

????????String?transformSQL?=
????????????????"insert?into?test_cdc_sink?select?*?from?mysql_binlog";

????????tableEnv.executeSql(sourceDDL);
????????tableEnv.executeSql(sinkDDL);
????????TableResult?result?=?tableEnv.executeSql(transformSQL);

????????result.print();
????????env.execute("sync-flink-cdc");
????}

}

运行结果

?

工程地址如下 , 欢迎start,for,pull requests

https://github.com/dafei1288/flink-connector-opengauss

- END?-

历史文章导读

你好,我是 +7 ,一个大数据领域的硬核原创作者。

做过后端架构、数据库中间件、数据平台&架构、产品。

专注大数据领域,数据库领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。

如果文件对您有点帮助,请关注、分享,帮助更多人~非常感谢

图片

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-30 11:59:57  更:2021-09-30 12:00:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 10:46:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码