IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 笔记系列之docker安装Kafka -> 正文阅读

[大数据]笔记系列之docker安装Kafka

0.目的

在自己的机器上搭建一个Kafka的环境。

1.下载docker镜像

由于Kafka需要使用到Zookeeper,这里就先下载Zookeeper和Kafka的两个镜像。

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2.单机方式启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

查看启动后的状态
zookeeper运行状态

3.启动kafka

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

check Kafka status
kafka status

4.创建topic

从第3步看,环境似乎是搭建好了,但是不太确定,因此进入容器创建一个topic测试一下。

$ docker exec -it kafka bin/bash
bash-5.1# pwd
/
bash-5.1# ls
bin    dev    etc    home   kafka  lib    lib64  media  mnt    opt    proc   root   run    sbin   srv    sys    tmp    usr    var
bash-5.1# cd opt/
bash-5.1# ls
kafka             kafka_2.13-2.7.0  overrides
bash-5.1# cd kafka
bash-5.1# ls
LICENSE    NOTICE     bin        config     libs       logs       site-docs
bash-5.1# cd bin/
bash-5.1# ls
connect-distributed.sh               kafka-console-producer.sh            kafka-leader-election.sh             kafka-run-class.sh                   trogdor.sh
connect-mirror-maker.sh              kafka-consumer-groups.sh             kafka-log-dirs.sh                    kafka-server-start.sh                windows
connect-standalone.sh                kafka-consumer-perf-test.sh          kafka-mirror-maker.sh                kafka-server-stop.sh                 zookeeper-security-migration.sh
kafka-acls.sh                        kafka-delegation-tokens.sh           kafka-preferred-replica-election.sh  kafka-streams-application-reset.sh   zookeeper-server-start.sh
kafka-broker-api-versions.sh         kafka-delete-records.sh              kafka-producer-perf-test.sh          kafka-topics.sh                      zookeeper-server-stop.sh
kafka-configs.sh                     kafka-dump-log.sh                    kafka-reassign-partitions.sh         kafka-verifiable-consumer.sh         zookeeper-shell.sh
kafka-console-consumer.sh            kafka-features.sh                    kafka-replica-verification.sh        kafka-verifiable-producer.sh
bash-5.1# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic hello-world
Created topic hello-world.
bash-5.1# kafka-topics.sh --zookeeper zookeeper:2181 --list
hello-world
bash-5.1#

可以看到,我们创建的topic已经成功,试着发一条message看看。
producer-consumer
哎呦,可以了蛮!那搞个程序试试?

5. spring boot with Kafka

在IDEA中新建一个springboot项目,再额外加上kafka和web的依赖:

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.7.3</version>
		</dependency>
</dependencies>

需要配置properties文件,这里先抄一份来,具体怎么用,先挖个坑:

spring.kafka.bootstrap-servers=127.0.0.1:9092

##初始化生产者配置
spring.kafka.producer.retries=0
##应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0,1,all/-1)
spring.kafka.producer.acks=1
##批量大小
spring.kafka.producer.batch-size=16384
##提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=true
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

然后再新建一个生产者产生消息,这里我们使用web发送消息:

package com.example.helloworld.web;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @GetMapping("/hello")
    public String hello() {
        return "hello world";
    }

    private KafkaTemplate<String, Object> kafkaTemplate;

    public HelloController(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping("/kafka/normal/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        kafkaTemplate.send("hello-world", msg);
    }
}

这里使用之前创建好的topic hello-world。

再创建一个消费者,来消费消息,这里就直接先把topic、partipartition及消息内容打印出来:

package com.example.helloworld.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"hello-world"})
    public void onoMsg(ConsumerRecord<?,?> record) {
        System.out.println("简单消费: " + record.topic() + "-" + record.partition() + "- " + record.value());
    }
}

然后启动程序,在地址栏输入具体请求

http://localhost/kafka/normal/mbp

这里我发送的信息时mbp,可以看到控制台有如下输出
在这里插入图片描述
具体代码来这里看

6.总结

爽也爽了,总结一下:

  • zookeeper和Kafka搭配使用
  • 配置Kafka容器的时候,注意使用的指令
  • 通过bash进入容器手动生产和消费消息
  • 使用Spring boot做了个简单的生产消费的例子

挖的待填的坑:

  • Kafka具体原理学习
  • Kafka脱离zookeeper的搭建
  • 集群搭建
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:07:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:01:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码