IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 1.15.0 Flink on Yarn安装并配置cdc -> 正文阅读

[大数据]1.15.0 Flink on Yarn安装并配置cdc

下载安装
#下载包
wget https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz 
#清华镜像 
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz   --no-check-certificat 
#解压
tar -zxvf flink-1.15.0-bin-scala_2.12.tgz 
#创建软连接
ln -s flink-1.15.0  flink 
配置masters和workers
#添加环境变量 : vim /etc/profile  
export FLINK_HOME=/home/hadoop/bigdata_software/flink
export PATH=$FLINK_HOME/bin:$PATH
#配置masters  : vim conf/masters  
stream-01:8081 
#配置workers : vim conf/workers 
stream-01
stream-02 
stream-03 
配置flink配置文件 vim conf/flink-conf.yaml
jobmanager.rpc.address: stream-01
# 配置high-availability mode
igh-availability: zookeeper
# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针 
high-availability.storageDir: hdfs://cluster/flinkha/
# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)
high-availability.zookeeper.quorum: stream-01:2181,stream-02:2181,stream-03:2181 
# (可选)设置zookeeper的root目录
#high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# 注释以下配置
# jobmanager.bind-host: localhost
# taskmanager.bind-host: localhost
#taskmanager.host: localhost
#rest.address: localhost
#rest.bind-address: localhost

#配置yarn 高可用重试次数
yarn.application-attempts: 10

修改yarn-site.xml

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>
配置zk:vim zoo.cfg
server.1=stream-01:2888:3888
server.2=stream-02:2888:3888
server.3=stream-03:2888:3888
配置Hadoop集成

??1.在三台机器上分别配置HADOOP_CLASSPATH

  export HADOOP_CLASSPATH=$HADOOP_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*

??2.添加jar包:

  wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
复制到slaver
scp -r /home/hadoop/bigdata_software/flink-1.15.0/  stream-02:/home/hadoop/bigdata_software/
scp -r /home/hadoop/bigdata_software/flink-1.15.0/  stream-03:/home/hadoop/bigdata_software/
启动并访问 stream-01:8081查看flink web ui是否正常启动
./bin/start-cluster.sh
job 测试
./bin/flink run -yjm 502m -ytm 1024m -ys 2  ./examples/batch/WordCount.jar
添加需要的cdcjar包

??这里添加了 mysql ,oracle ,elasticsearch, jdbc

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.2.1/flink-sql-connector-oracle-cdc-2.2.1.jar  
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.15.0/flink-sql-connector-elasticsearch7-1.15.0.jar  
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.0/flink-connector-jdbc-1.15.0.jar
测试cdc mysql to mysql

????mysql 需开启binlog日志, 并赋予用户 SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 权限,配置方式详见官方文档。 否则会出现以下错误:

	java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation
	Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
        	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:122) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]  

??启动sql-client

./bin/sql-client.sh

??进行测试

-- mysql 中创建源表products、price和sink表product_price
drop table products ;
create table products (
id int(8) PRIMARY key ,
pro_name varchar(20) 
) 
create table price (
id  int(8) primary key 
,pro_id int 
,price int ) ;

create table product_price (
pro_id int(8)  primary key, 
pro_name varchar(20) ,
price int(8)
);
 
-- 创建flink中间表
CREATE TABLE flink_products (
    id int  primary key , 
    pro_name STRING 
  )WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.1.1',
    'port' = '3306',
    'username' = 'bigdata',
    'password' = 'bigdata',
    'database-name' = 'bigdata',
    'table-name' = 'products'
  );
 
	 
	CREATE TABLE flink_price (
	  id int primary key , 
    pro_id int , 
    price int  
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.1.1',
    'port' = '3306',
    'username' = 'bigdata',
    'password' = 'bigdata',
    'database-name' = 'bigdata',
    'table-name' = 'price'
  ); 
 
create table flink_product_price(
pro_id  int  primary key,
pro_name  string ,
price int  
)
with(
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.1.1:3306/bigdata',
    'username'='bigdata',
    'password'='bigdata',
    'table-name' = 'product_price'
);

-- 提交任务
insert into flink_product_price
 select a.id as pro_id 
      , a.pro_name as pro_name  
			, b.price as price  
from  flink_products a 
left join flink_price b on a.id = b.pro_id 


-- 更新源表数据 

insert into products  select 1,'a'  ; 
insert into price select 1 , 1, 10 ;  
insert into products  select 2,'b'  ; 
insert into price select 2 , 2, 10 ; 
insert into products  select 3,'c'  ; 
insert into price select 3, 3, 11 ; 
insert into products  select 4,'d'  ; 
insert into price select 4, 4, 15 ; 
insert into products  select 5,'c'  ; 
insert into products  select 6,'d'  ;
insert into price select 5, 5, 20 ;

-- 查看结果数据 
select * from product_price
测试 cdc oracle to mysql

??同样, oracle需开启归档日志,并赋予相关权限。配置方式如下, 这里以NON-CDB为例,详见 官方文档
??1.以sysdba身份登录Oracle数据库

	ORACLE_SID=SID
	export ORACLE_SID
	sqlplus /nolog
	CONNECT sys/password AS SYSDBA

??2.查看数据库的归档状态

	SQL> ARCHIVE LOG LIST
	Database log mode              No Archive Mode #非存档模式
	Automatic archival             Disabled
	Archive destination            USE_DB_RECOVERY_FILE_DEST
	Oldest online log sequence     16
	Current log sequence           18
	## 出现 illegal ARCHIVE LOG option 结果  执行 host clear 
	## 若Database log mode参数返回Archive Mode #存档模式,则表示当前已开启Oracle归档模式。

??3.配置归档路径和合理的存储空间

	alter system set db_recovery_file_dest_size = 10G;
	alter system set db_recovery_file_dest = '/home/oracle/recovery_area' scope=spfile; 

??4.开启归档日志。

#关闭数据库
shutdown immediate;
#启动数据库至MOUNT状态
startup mount;

ORACLE instance started.

Total System Global Area 1610612736 bytes
Fixed Size                  2924928 bytes
Variable Size             520097408 bytes
Database Buffers         1073741824 bytes
Redo Buffers               13848576 bytes
ORA-01078: failure in processing system parameters
ORA-16032: parameter LOG_ARCHIVE_DEST_1 destination string cannot be translated
ORA-01264: Unable to create archived log file name
ORA-19800: Unable to initialize Oracle Managed Destination
Linux-x86_64 Error: 13: Permission denied
# 这里报错是因为我是在root下创建的,不属于oracle ,给他Oracle的权限: chown -R oracle:dba  /home/oracle/recovery_area 再次执行startup mount
#启动归档模式
alter database archivelog;
#查询归档状态确认返回Archive Mode 
archive log list 
#启动数据库
alter database open;

??5.为表和数据库启用补充日志记录(每个表都需设置):

-- Enable supplemental logging for a specific table:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

??6.创建表空间,可直接使用已存在表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/xe/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

??7.创建用户并 赋予权限

 CREATE USER flinkuser IDENTIFIED BY flink#90 DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs ;
 GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT CREATE TABLE TO flinkuser;
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser; 
  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; 
  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
进行测试cdc
#创建oracle 表 
CREATE TABLE  flink_doctor (
     ID INT NOT NULL primary key ,
     NAME  STRING ,
     DEPT_ID int 
     ) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = '172.16.221.78',
     'port' = '1522',
     'username' = 'flinkuser',
     'password' = 'flink#90',
     'database-name' = 'XE',
     'schema-name' = 'flinkuser',
     'table-name' = 'DOCTOR');


#创建flink 中间表
CREATE TABLE  flink_doctor_mysql (
     ID INT NOT NULL primary key ,
     name  STRING ,
     dept_id int 
     ) with(
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.16.221.78:3306/bigdata',
    'username'='bigdata',
    'password'='bigdata',
    'table-name' = 'doctor'
); 

#提交任务
insert into  flink_doctor_mysql select ID , NAME , DEPT_ID from flink_doctor ;
测试 cdc oracle to es

??在kibana中创建索引

PUT /flink-cdc-oracle
{
  "mappings": {
    "properties": {
      "id":{"type": "integer"},
      "name":{"type": "text"},
      "dept_id":{"type": "integer"} 
    }
  }
}   

??创建flink-es中间表

CREATE TABLE  flink_doctor_es (
     id INT NOT NULL primary key ,
     name  STRING ,
     dept_id int 
     )  WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://221.215.21.6:9200',
     'index' = 'flink-cdc-oracle',
     'username' = 'elastic',
     'password' = '123'
 );

??提交任务

insert  into flink_doctor_es  select ID , NAME , DEPT_ID from  flink_doctor  ;

??更新数据并查看es数据

GET /flink-cdc-oracle/_search
{
  "query": {
    "match_all": {
      
    }
  }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-13 11:48:46  更:2022-05-13 11:49:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:49:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码