IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flume高可用集群搭建和连接kafka -> 正文阅读

[大数据]flume高可用集群搭建和连接kafka

1、环境介绍
操作系统:centos 7.9
jdk版本:8u291
flume版本:1.9.0
flume下载地址:
http://flume.apache.org/download.html
在这里插入图片描述
2、flume集群架构和数据流向
在这里插入图片描述
agent中的数据流向
在这里插入图片描述

3、资源规划

10.99.27.121 flumec01.wtown.com 4核心 8G内存 100G硬盘
10.99.27.122 flumec02.wtown.com 4核心 8G内存 100G硬盘

10.99.27.131 flumea01.wtown.com 4核心 8G内存 100G硬盘
10.99.27.132 flumea02.wtown.com 4核心 8G内存 100G硬盘
10.99.27.133 flumea03.wtown.com 4核心 8G内存 100G硬盘
10.99.27.134 flumea04.wtown.com 4核心 8G内存 100G硬盘

4、关闭防火墙和selinux(所有节点)
在这里插入图片描述
5、配置主机名并设置hosts文件(所有节点)
在这里插入图片描述

10.99.27.121	flumec01.wtown.com
10.99.27.122	flumec02.wtown.com
10.99.27.131	flumea01.wtown.com
10.99.27.132	flumea02.wtown.com
10.99.27.133	flumea03.wtown.com
10.99.27.134	flumea04.wtown.com
10.99.27.101	kafka01.wtown.com
10.99.27.102	kafka02.wtown.com
10.99.27.103	kafka03.wtown.com

6、安装jdk(所有节点)
https://blog.csdn.net/zyj81092211/article/details/118055068

7、kafka集群搭建
https://blog.csdn.net/zyj81092211/article/details/119326105

8、建立数据目录/data(所有节点)

mkdir /data

9、上传软件到/data目录,解压并重命名为flume(所有节点)

tar -xvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume

在这里插入图片描述
10、建立软连接(所有节点)

ln -s /data/flume/ /usr/local/flume

11、配置Collector节点(flumec01.wtown.com,flumec02.wtown.com节点)

vi /data/flume/conf/collector-to-kafka.conf

添加如下:

# flume to kafka, avro collector
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 58001

# channel-1000000-10000
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.keep-alive = 60
 
# sink k1 to kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = kafka-test
a1.sinks.k1.kafka.bootstrap.servers = kafka01.wtown.com:9092,kafka02.wtown.com:9092,kafka03.wtown.com:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
a1.sinks.k1.kafka.producer.compression.type = snappy

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

12、配置agent节点

flumea01.wtown.com,flumea02.wtown.com节点:

vi /data/flume/conf/agent-to-collector.conf

添加如下:

# agent to collector ha
a1.sources = r1 
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
  
# set source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 57001
 
# set sink1
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = flumec01.wtown.com
a1.sinks.k1.port = 58001
  
# set sink2  
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flumec02.wtown.com
a1.sinks.k2.port = 58001
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000
  
#set sink group
a1.sinkgroups.g1.sinks = k1 k2
#set failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1
a1.sinks.k1.channel = c1

flumea03.wtown.com,flumea04.wtown.com节点:

vi /data/flume/conf/agent-to-collector.conf

添加如下:

# agent to collector ha
a1.sources = r1 
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
  
# set source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 57001
 
# set sink1
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = flumec01.wtown.com
a1.sinks.k1.port = 58001
  
# set sink2  
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flumec02.wtown.com
a1.sinks.k2.port = 58001
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000
  
#set sink group
a1.sinkgroups.g1.sinks = k1 k2
#set failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 1
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1
a1.sinks.k1.channel = c1

13、flume配置JDK(所有节点)

cp /data/flume/conf/flume-env.sh.template /data/flume/conf/flume-env.sh

添加如下:

export JAVA_HOME=/usr/local/java

14、配置flume环境变量(所有节点)

vi /etc/profile

添加如下:

# Flume environment
export FLUME_HOME=/data/flume
export PATH=$PATH:$FLUME_HOME/bin

重新加载环境变量:

source /etc/profile

15、启动flume collector(flumec01.wtown.com,flumec02.wtown.com节点)

nohup flume-ng agent --conf $FLUME_HOME/conf/ --conf-file $FLUME_HOME/conf/collector-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console >> /dev/null 2>&1 &

16、启动flume agent(flumea01、flumea02、flumea03、flumea04.wtown.com节点)

nohup flume-ng agent --conf $FLUME_HOME/conf/ --conf-file $FLUME_HOME/conf/agent-to-collector.conf --name a1 -Dflume.root.logger=INFO,console >> /dev/null 2>&1 &

#####################以下为要采集数据的数据源客户端配置#####################
17、数据源agent配置示例
在这里插入图片描述

source-to-agent.conf内如如下:

#tail to flume agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 k3 k4
a1.sinkgroups = g1

#set source
a1.sources.r1.type = TAILDIR
#记录文件读取位置的文件
a1.sources.r1.positionFile = /usr/local/flume/taildir_position.json
#文件组,这里可以配置监控多个文件多个文件
a1.sources.r1.filegroups = f1
#要监控的文件
a1.sources.r1.filegroups.f1 = /usr/local/flume/test.log

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#set sink1 to agent1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumea01.wtown.com
a1.sinks.k1.port = 57001

#set sink2 to agent2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flumea02.wtown.com
a1.sinks.k2.port = 57001

#set sink3 to agent3
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = flumea03.wtown.com
a1.sinks.k3.port = 57001

#set sink4 to agent4
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = flumea04.wtown.com
a1.sinks.k4.port = 57001

#set sink group with LB
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector.maxTimeOut = 10000
#负载均衡模式random和round_robin
a1.sinkgroups.g1.processor.selector = random

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

注意:多source配置如下
#set source

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/flume/1.log
a1.sources.r1.filegroups.f2 = /usr/local/flume/2.txt

18、测试

在source agent 端建立上面配置文件中的log文件在这里插入图片描述
重复输入:

echo 11111 >> test.log

在kafka集群上开启消费者进程:

kafka-console-consumer.sh --bootstrap-server kafka01.wtown.com:9092,kafka02.wtown.com:9092,kafka03.wtown.com:9092 --topic kafka-test

数据已经采集到kafka
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-11 12:28:56  更:2021-08-11 12:30:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:10:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码