0.目的
在自己的机器上搭建一个Kafka的环境。
1.下载docker镜像
由于Kafka需要使用到Zookeeper,这里就先下载Zookeeper和Kafka的两个镜像。
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
2.单机方式启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
查看启动后的状态
3.启动kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
check Kafka status
4.创建topic
从第3步看,环境似乎是搭建好了,但是不太确定,因此进入容器创建一个topic测试一下。
$ docker exec -it kafka bin/bash
bash-5.1
/
bash-5.1
bin dev etc home kafka lib lib64 media mnt opt proc root run sbin srv sys tmp usr var
bash-5.1
bash-5.1
kafka kafka_2.13-2.7.0 overrides
bash-5.1
bash-5.1
LICENSE NOTICE bin config libs logs site-docs
bash-5.1
bash-5.1
connect-distributed.sh kafka-console-producer.sh kafka-leader-election.sh kafka-run-class.sh trogdor.sh
connect-mirror-maker.sh kafka-consumer-groups.sh kafka-log-dirs.sh kafka-server-start.sh windows
connect-standalone.sh kafka-consumer-perf-test.sh kafka-mirror-maker.sh kafka-server-stop.sh zookeeper-security-migration.sh
kafka-acls.sh kafka-delegation-tokens.sh kafka-preferred-replica-election.sh kafka-streams-application-reset.sh zookeeper-server-start.sh
kafka-broker-api-versions.sh kafka-delete-records.sh kafka-producer-perf-test.sh kafka-topics.sh zookeeper-server-stop.sh
kafka-configs.sh kafka-dump-log.sh kafka-reassign-partitions.sh kafka-verifiable-consumer.sh zookeeper-shell.sh
kafka-console-consumer.sh kafka-features.sh kafka-replica-verification.sh kafka-verifiable-producer.sh
bash-5.1
Created topic hello-world.
bash-5.1
hello-world
bash-5.1
可以看到,我们创建的topic已经成功,试着发一条message看看。 哎呦,可以了蛮!那搞个程序试试?
5. spring boot with Kafka
在IDEA中新建一个springboot项目,再额外加上kafka和web的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
需要配置properties文件,这里先抄一份来,具体怎么用,先挖个坑:
spring.kafka.bootstrap-servers=127.0.0.1:9092
##初始化生产者配置
spring.kafka.producer.retries=0
##应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0,1,all/-1)
spring.kafka.producer.acks=1
##批量大小
spring.kafka.producer.batch-size=16384
##提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=true
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
然后再新建一个生产者产生消息,这里我们使用web发送消息:
package com.example.helloworld.web;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@GetMapping("/hello")
public String hello() {
return "hello world";
}
private KafkaTemplate<String, Object> kafkaTemplate;
public HelloController(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/kafka/normal/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
kafkaTemplate.send("hello-world", msg);
}
}
这里使用之前创建好的topic hello-world。
再创建一个消费者,来消费消息,这里就直接先把topic、partipartition及消息内容打印出来:
package com.example.helloworld.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"hello-world"})
public void onoMsg(ConsumerRecord<?,?> record) {
System.out.println("简单消费: " + record.topic() + "-" + record.partition() + "- " + record.value());
}
}
然后启动程序,在地址栏输入具体请求
http://localhost/kafka/normal/mbp
这里我发送的信息时mbp,可以看到控制台有如下输出 具体代码来这里看
6.总结
爽也爽了,总结一下:
- zookeeper和Kafka搭配使用
- 配置Kafka容器的时候,注意使用的指令
- 通过bash进入容器手动生产和消费消息
- 使用Spring boot做了个简单的生产消费的例子
挖的待填的坑:
- Kafka具体原理学习
- Kafka脱离zookeeper的搭建
- 集群搭建
|