部署文件夹结构
flume ---- flume源代码,也是flume运行本体
flume-ftp ---- flume连接ftp工具类
flumeMysqlSink ----flume将 数据写入mysql工具类
mysqlSink ----flume主要运行配置文件
建表语句 ---- 创建4张表的建表语句
部署
- 将文件夹中的【flume】文件夹,放到服务器任何位置即可。
- 修改【flume】文件夹中的cong/flume-env文件,修改其中的java位置,将其修改为本地的java地址
- 运行命令:
./flume-ng agent -c /usr/local/bigdata/flume/conf -f /usr/local/bigdata/flume/conf/mysqlSink.conf -n agent1 -Dflume.root.logger=INFO,console
创建数据库
根据上传到ftp上面的文件,需要根据上传的文件字段,创建对应的数据库表
创建数据库注意事项:
- 第一个字段是【id】主键
- 倒数第二个字段是【data_sources】,【varchar】,表示当前数据来源于哪个文件中
- 倒数第一个字段是【create_datetime】,【datetime】,表示当前数据的插入时间
flume配置文件解析
整个系统只有一个配置文件【mysqlSink.conf】,全部配置都在这个里面,下面是这个文件的详细解析
agent1.sources = ftp2
agent1.sinks = k1
agent1.channels = ch2
agent1.sources.ftp2.type = org.keedio.flume.source.ftp.source.Source
agent1.sources.ftp2.client.source = ftp
# agent.sources.sftp1.client.source = sftp //如果使用sftp的需要替换
agent1.sources.ftp2.name.server = 192.168.134.100
agent1.sources.ftp2.port = 21
agent1.sources.ftp2.user = sdd
agent1.sources.ftp2.password = sdd
agent1.sources.ftp2.working.directory = /rs //读取的ftp目录
agent1.sources.ftp2.filter.pattern = .+\.rs //匹配文件,使用的是java的正则
#agent1.sources.ftp2.folder = C:/Users/sdd/Downloads //这个是ftp服务器设置的本地的文件夹,这个用不用都可以
agent1.sources.ftp2.file.name = ftp2-status-file.ser
agent1.sources.ftp2.run.discover.delay=5000
## root is launching flume binary.
# agent.sources.sftp1.knownHosts = /root/.ssh/known_hosts //ssh安全秘钥
## for testing porposes only, default is yes
# agent.sources.sftp1.strictHostKeyChecking = no
agent1.sources.ftp2.flushlines = true
agent1.sources.ftp2.search.recursive = true
agent1.sources.ftp2.processInUse = false
agent1.sources.ftp2.processInUseTimeout = 30
agent1.sources.ftp2.channels = ch2
agent1.channels.ch2.type = memory
agent1.channels.ch2.capacity = 100000000
agent1.channels.ch2.transactionCapacity = 100000000
//数据库相关
agent1.sinks.k1.type = com.tesnik.flume.sink.MySqlSink
agent1.sinks.k1.url = jdbc:mysql://192.168.134.100:3306/imvno?useUnicode=true&characterEncoding=utf8&useSSL=true
agent1.sinks.k1.user= root
agent1.sinks.k1.password= root123
agent1.sinks.k1.driver= com.mysql.cj.jdbc.Driver
agent1.sinks.k1.databaseName = imvno //数据库
agent1.sinks.k1.tableName = mvno_cdma_prepaid //数据库表
agent1.sinks.k1.partition = \\<\\-\\-\\-\\> //读取的文件的分隔符
agent1.sinks.k1.iscustom = false
agent1.sinks.k1.channel = ch2
完整代码下载:https://download.csdn.net/download/songdongdong6/21137656
|