| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Kafka -> 正文阅读 |
|
[大数据]Kafka |
1.安装kafka(CentOS安装Kafka2.7.0)安装前确保JAVA环境是正常的 1.1 更新系统
1.2 安装Kafka 从kafka官网上下载kafka的二进制安装包,下载地址是:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz,下载下来之后进行解压。
解压文件成功之后,接着创建一个文件的软连接,这样可以方便进行kafka的升级。
?接下来,进入到kafka目录下,启动kafka,启动kafka之前得先启动zookeeper。
?启动成功之后重开一个xshell窗口进行启动Kafka
注意:启动时可能会因为centos的内存问题造成启动失败,可以修改kafka配置文件配置内存大小
找到:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 修改成:?export KAFKA_HEAP_OPTS="-Xmx512M -Xms265M" ?2.安装成功之后就可以开始测试了先通过kafka创建一个topic。进入到kafka目录下,接着通过下面命令创建一个kafka topic。
接下来通过生产者进行发送消息。
?接着新开一个xshell窗口创建一个消费者来进行接收消息。
?到处我们的Kafka就安装成功了。 3.Springboot集成Kafka先在pom.xml中添加kafka依赖 <!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 然后在application.yml中添加kafka配置 spring: kafka: # IP和端口 bootstrap-servers: 106.55.150.62:9092 producer: # 发生错误后,消息重发的次数。 retries: 0 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false 创建消息生产者
?创建消息消费者
新建一个接口测试发送消息
测试接口,可以看到消息发送成功,并且成功消费 ?到此我们的kafka入门就已完成。 我也是刚学习Kafka,记录一下学习经过。文章是通过别人的博客学习加上自己的总结。 借鉴博客:https://www.cnblogs.com/Hackerman/p/12595246.html,https://blog.csdn.net/zhou_fan_xi/article/details/105241273 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年3日历 | -2025/3/4 3:20:26- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |