部署方式:
原生程序直接安装运行
sh zookeeper-start.sh ../config/zookeeper.properties
#server.properties开启listeners=PLAINTEXT://:9092配置 否则spring-kafka会报错
sh kafka-server-start.sh ../config/server.properties
docker容器部署
docker 启动zk+kafka
docker pull zookeeper
docker pull wurstmeister/kafka
docker run --name my-zk -p 2081:2081 zookeeper
docker run -e KAFKA_ZOOKEEPER_CONNECT=my-zk:2181 -e KAFKA_LISTENERS=PLAINTEXT://:9092 --link my-zk:my-zk -it wurstmeister/kafka
使用过程中可以感觉到docker部署对笔记本压力更小,原生程序跑起来风扇一会儿就呼呼转。。。
java使用
spring-kafka
maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定kafka 代理地址,可以多个
producer: # 生产者
retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 发送消息到kafka
*@param topic 主题
*@param message 内容体
*/
public void sendMsg(String topic , String message){
kafkaTemplate.send(topic ,message);
}
}
消费者
@Configuration
public class KafkaConfig {
@KafkaListener(id="m", topics={"test"})
public void listen(String msg){
log.info("消费: {}" , msg);
}
}
|