下载安装
wget https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz --no-check-certificat
tar -zxvf flink-1.15.0-bin-scala_2.12.tgz
ln -s flink-1.15.0 flink
配置masters和workers
export FLINK_HOME=/home/hadoop/bigdata_software/flink
export PATH=$FLINK_HOME/bin:$PATH
stream-01:8081
stream-01
stream-02
stream-03
配置flink配置文件 vim conf/flink-conf.yaml
jobmanager.rpc.address: stream-01
igh-availability: zookeeper
high-availability.storageDir: hdfs://cluster/flinkha/
high-availability.zookeeper.quorum: stream-01:2181,stream-02:2181,stream-03:2181
yarn.application-attempts: 10
修改yarn-site.xml
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
配置zk:vim zoo.cfg
server.1=stream-01:2888:3888
server.2=stream-02:2888:3888
server.3=stream-03:2888:3888
配置Hadoop集成
??1.在三台机器上分别配置HADOOP_CLASSPATH
export HADOOP_CLASSPATH=$HADOOP_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*
??2.添加jar包:
wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
复制到slaver
scp -r /home/hadoop/bigdata_software/flink-1.15.0/ stream-02:/home/hadoop/bigdata_software/
scp -r /home/hadoop/bigdata_software/flink-1.15.0/ stream-03:/home/hadoop/bigdata_software/
启动并访问 stream-01:8081查看flink web ui是否正常启动
./bin/start-cluster.sh
job 测试
./bin/flink run -yjm 502m -ytm 1024m -ys 2 ./examples/batch/WordCount.jar
添加需要的cdcjar包
??这里添加了 mysql ,oracle ,elasticsearch, jdbc
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.2.1/flink-sql-connector-oracle-cdc-2.2.1.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.15.0/flink-sql-connector-elasticsearch7-1.15.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.0/flink-connector-jdbc-1.15.0.jar
测试cdc mysql to mysql
????mysql 需开启binlog日志, 并赋予用户 SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 权限,配置方式详见官方文档。 否则会出现以下错误:
java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:122) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
??启动sql-client
./bin/sql-client.sh
??进行测试
drop table products ;
create table products (
id int(8) PRIMARY key ,
pro_name varchar(20)
)
create table price (
id int(8) primary key
,pro_id int
,price int ) ;
create table product_price (
pro_id int(8) primary key,
pro_name varchar(20) ,
price int(8)
);
CREATE TABLE flink_products (
id int primary key ,
pro_name STRING
)WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.1',
'port' = '3306',
'username' = 'bigdata',
'password' = 'bigdata',
'database-name' = 'bigdata',
'table-name' = 'products'
);
CREATE TABLE flink_price (
id int primary key ,
pro_id int ,
price int
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.1',
'port' = '3306',
'username' = 'bigdata',
'password' = 'bigdata',
'database-name' = 'bigdata',
'table-name' = 'price'
);
create table flink_product_price(
pro_id int primary key,
pro_name string ,
price int
)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.1:3306/bigdata',
'username'='bigdata',
'password'='bigdata',
'table-name' = 'product_price'
);
insert into flink_product_price
select a.id as pro_id
, a.pro_name as pro_name
, b.price as price
from flink_products a
left join flink_price b on a.id = b.pro_id
insert into products select 1,'a' ;
insert into price select 1 , 1, 10 ;
insert into products select 2,'b' ;
insert into price select 2 , 2, 10 ;
insert into products select 3,'c' ;
insert into price select 3, 3, 11 ;
insert into products select 4,'d' ;
insert into price select 4, 4, 15 ;
insert into products select 5,'c' ;
insert into products select 6,'d' ;
insert into price select 5, 5, 20 ;
select * from product_price
测试 cdc oracle to mysql
??同样, oracle需开启归档日志,并赋予相关权限。配置方式如下, 这里以NON-CDB为例,详见 官方文档 ??1.以sysdba身份登录Oracle数据库
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
CONNECT sys/password AS SYSDBA
??2.查看数据库的归档状态
SQL> ARCHIVE LOG LIST
Database log mode No Archive Mode
Automatic archival Disabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 16
Current log sequence 18
??3.配置归档路径和合理的存储空间
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/home/oracle/recovery_area' scope=spfile;
??4.开启归档日志。
shutdown immediate;
startup mount;
ORACLE instance started.
Total System Global Area 1610612736 bytes
Fixed Size 2924928 bytes
Variable Size 520097408 bytes
Database Buffers 1073741824 bytes
Redo Buffers 13848576 bytes
ORA-01078: failure in processing system parameters
ORA-16032: parameter LOG_ARCHIVE_DEST_1 destination string cannot be translated
ORA-01264: Unable to create archived log file name
ORA-19800: Unable to initialize Oracle Managed Destination
Linux-x86_64 Error: 13: Permission denied
alter database archivelog;
archive log list
alter database open;
??5.为表和数据库启用补充日志记录(每个表都需设置):
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
??6.创建表空间,可直接使用已存在表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/xe/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
??7.创建用户并 赋予权限
CREATE USER flinkuser IDENTIFIED BY flink
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
进行测试cdc
CREATE TABLE flink_doctor (
ID INT NOT NULL primary key ,
NAME STRING ,
DEPT_ID int
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '172.16.221.78',
'port' = '1522',
'username' = 'flinkuser',
'password' = 'flink#90',
'database-name' = 'XE',
'schema-name' = 'flinkuser',
'table-name' = 'DOCTOR');
CREATE TABLE flink_doctor_mysql (
ID INT NOT NULL primary key ,
name STRING ,
dept_id int
) with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.221.78:3306/bigdata',
'username'='bigdata',
'password'='bigdata',
'table-name' = 'doctor'
);
insert into flink_doctor_mysql select ID , NAME , DEPT_ID from flink_doctor ;
测试 cdc oracle to es
??在kibana中创建索引
PUT /flink-cdc-oracle
{
"mappings": {
"properties": {
"id":{"type": "integer"},
"name":{"type": "text"},
"dept_id":{"type": "integer"}
}
}
}
??创建flink-es中间表
CREATE TABLE flink_doctor_es (
id INT NOT NULL primary key ,
name STRING ,
dept_id int
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://221.215.21.6:9200',
'index' = 'flink-cdc-oracle',
'username' = 'elastic',
'password' = '123'
);
??提交任务
insert into flink_doctor_es select ID , NAME , DEPT_ID from flink_doctor ;
??更新数据并查看es数据
GET /flink-cdc-oracle/_search
{
"query": {
"match_all": {
}
}
}
|