IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka单机环境搭建及整合SpringBoot完成基本使用 -> 正文阅读

[大数据]Kafka单机环境搭建及整合SpringBoot完成基本使用

Kafka单机环境搭建及整合SpringBoot完成基本使用

Kafka单机环境搭建

下载kafka_2.11-1.1.0.tgz版本

下载地址 https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz

上传到服务器,我使用的是centos7 环境

使用ftp工具上传到/opt目录下
解压:tar -zxvf kafka_2.11-1.1.0.tgz

进入kafka目录

cd /opt/kafka_2.11-2.1.1

启动zookeeper(kafka自带zookeeper)和kafka

注意:启动这两个的前提是你的服务器已经装好了java环境

#安装open-jdk11
yum install java-11-openjdk-devel.x86_64

默认不修改任何配置 zookeeper地址为:你服务器ip:2181

kafka地址 你服务器ip:9092

前台启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
后台运行:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-run.log 2>&1 &

前台启动kafka:
bin/kafka-server-start.sh config/server.properties
后台运行:
nohup bin/kafka-server-start.sh config/server.properties > kafka-run.log 2>&1 &

通过linux命令查看zookeeper和kafka是否启动成功

netstat -nltp
可以看到9092和2181两个端口正常运行就说明启动成功了

整合SpringBoot实现一个基本的demo

开发环境

jdk: jdk-11.0.10.9-hotspot

maven: 3.8.1

IDE: IntelliJ IDEA 2021.1.3

SpringBoot: 2.3.2.RELEASE

1.新建springboot项目

使用Spring Initializr创建过程不再赘述

项目目录:

在这里插入图片描述

2.配置pom.xml环境

 		 <!-- springboot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--引入kafak和spring整合的jar-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.2.RELEASE</version>
        </dependency>

        <!-- lombok依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 阿里的JSON工具包 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

3.application.properties配置文件

spring.application.name=kafka-demo
server.port=8080

# kafka 配置
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=10.172.29.100:9092

# provider  
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# consumer  
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4.生产者和消费者

生产者

/**
 * @author 赵赳
 * @since: 2021/8/10 17:13
 * @desc: 测试发送kafka消息
 */
@Slf4j
@RestController
public class KafkaProducer {

    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 发送kafka测试消息
     *
     * @return 响应结果
     */
    @GetMapping("test")
    public String testKafka() {
        String str = new String("hello kafka!");
        kafkaTemplate.send(MessageTopic.TEST_TOPIC, JSONObject.toJSONString(str));
        log.info("发送消息到kafka,消息体:{}", str);
        return "成功发送消息到kafka";
    }
}

消费者

/**
 * @author 赵赳
 * @since: 2021/8/10 17:19
 * @desc: kafka消费者
 */
@Slf4j
@Component
public class KafkaConsumer {

    /**
     * 接收TEST_TOPIC主题消息
     *
     * @param consumerRecord 消息体
     */
    @KafkaListener(topics = {MessageTopic.TEST_TOPIC})
    public void getMessages(ConsumerRecord<String, String> consumerRecord) {
        String value = consumerRecord.value();
        log.info("接收到kafka消息,消息为:{}", value);
    }

    /**
     * 接收TEST_USER_TOPIC主题消息
     *
     * @param consumerRecord 消息体
     */
    @KafkaListener(topics = {MessageTopic.TEST_USER_TOPIC})
    public void getUserMessages(ConsumerRecord<String, String> consumerRecord) {
        //判断是否为null
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        log.info(">>>>>>>>>> record =" + kafkaMessage);
        if (kafkaMessage.isPresent()) {
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            // 执行业务代码
            log.info("消费kafka消息,消息为:{}", message);
        }
    }
}

MessageTopic

/**
 * @author 赵赳
 * @since: 2021/8/10 17:21
 * @desc:
 */
public class MessageTopic {

    /**
     * 测试消息主题
     */
    public static final String TEST_TOPIC = "test";

    /**
     * 测试用户消息主题
     */
    public static final String TEST_USER_TOPIC = "test-user";

}

5.启动项目测试

使用postman工具测试 http://localhost:8080/test

在这里插入图片描述

6.测试结果

在这里插入图片描述

加入一个实体类模拟普通业务场景

/**
 * @author 赵赳
 * @since: 2021/8/10 17:56
 * @desc:
 */
@Data
@Accessors(chain = true)
public class User {

    private String id;

    private String username;

    private String state;

}

消费者不变,加入一个kafkaHandler发送消息

/**
 * @author 赵赳
 * @since: 2021/8/10 18:00
 * @desc:
 */
@Slf4j
@Component
public class kafkaHandler {

    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public kafkaHandler(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 发送用户信息
     *
     * @param id 用户id
     */
    public void sendMessages(String id) {
        var user = new User().setId(id).setUsername("tom").setState("0");
        kafkaTemplate.send(MessageTopic.TEST_USER_TOPIC, JSONObject.toJSONString(user));
        log.info("发送消息到kafka,消息内容:{}", user);
    }

}

测试类

    kafkaHandler kafkaHandler;

    @Autowired
    public KafkaDemoApplicationTests(kafkaHandler kafkaHandler) {
        this.kafkaHandler = kafkaHandler;
    }

    @Test
    void sendMessages() {
        for (int i = 0; i < 10; i++) {
            kafkaHandler.sendMessages(String.valueOf(++i));
        }
    }

启功项目执行测试方法

测试结果
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-11 12:28:56  更:2021-08-11 12:29:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:16:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码