消息队列的两种模式
点对点模式
一对一,消费者主动拉取数据,消息收到后消息删除, 并且消息只能被一个消费者消费
发布订阅模式
一对多,消费者消费数据后不会清除消息,生产者将消息发布到topic中,可以被所有订阅了的消费者消费,消费者可以通过两种方式获取消息,一种是主动拉取,一种topic推送消息,kafka是基于主动拉取模式的
概述
kafka是一个分布式的基于发布订阅模式的消息队列,主要应用于大数据领域。
使用消息队列的好处
解耦 :允许独立的修改活扩展两边的代码,只要确保他们遵守同样的接口约束可恢复性 :系统的一部分组件失效时,不会影响整个系统,即一个处理消息的服务挂掉后,加入队列中的消息仍然可以在服务恢复后处理缓冲 :有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的不一致- 异步通信:不需要立即处理消息,可以在需要的时候再处理
安装
kafka依赖于zookeeper,所以也必须安装zookeeper 下载地址:https://kafka.apache.org/downloads 把此包解压到服务器即可
config目录下的server.properties
# broker的全局唯一标识,集群中不能重复,
broker.id=0
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://10.177.4.38:9092
# 监听ip和端口案例 listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.10.1:9092
# 设置删除功能可使用,,新版本没有发现这个设置
delete.topic.enable=true
# 服务器用于从网络接收请求并向网络发送响应的线程数
num.network.threads=3
# 服务器用于处理请求的线程数,其中可能包括磁盘I/O
num.io.threads=8
# socket服务器使用的发送缓冲区
socket.send.buffer.bytes=102400
# socket服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=102400
# 套接字服务器将接受的请求的最大大小(针对OOM的保护)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 用逗号分隔的目录列表,在其中存储日志文件
log.dirs=/tmp/kafka-logs
# 每个主题的默认日志分区数。更多分区允许更大分区
num.partitions=1
# 每个数据目录用于启动时日志恢复和关闭时刷新的线程数。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 组元数据内部主题“__consumer_offset”和“__transaction_state”的复制因子
# 对于开发测试以外的任何情况,建议使用大于1的值,以确保可用性,例如3。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# 由于过期而可以被删除的日志文件的最小过期时间
log.retention.hours=168
# 日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。
log.segment.bytes=1073741824
# 检查日志段以确定是否可以根据其删除的时间间隔
log.retention.check.interval.ms=300000
# 集群连接案例 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# 连接zookeeper的ip地址
zookeeper.connect=localhost:2181
# 连接zookeeper超时时间
zookeeper.connection.timeout.ms=6000
# 延迟初始消费者重新平衡的时间(以毫秒为单位)
group.initial.rebalance.delay.ms=0
bin目录下的常用命令
- 启动和关闭命令
zookeeper-server-start.sh zookeeper-server-stop.sh 后台启动命令 - 生产者和消费者控制台打印信息命令
kafka-console-consumer.sh kafka-console-producer.sh - 关于topic的操作,增删改查
kafka-topics.sh
|