首先我们需要在本地电脑下载kafka软件
详细过程链接:
Win10下kafka简单安装及使用.
本地测试之后没有问题,我们给它结合到项目
记得zookeeper-server和kafka-server要开起来,不然访问不了
随便建一个springboot项目
项目结构:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo</groupId>
<artifactId>spring-boot--shardingsphere-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot--shardingsphere-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<scope>runtime</scope>
</dependency>
<!--Mybatis-Plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.67</version>
</dependency>
<!-- apache commons start -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port=8091
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
#指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
#指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.topic=localhost_test
生产者:
package com.etc.kafkaTest;
import com.etc.common.api.ApiResult;
import com.etc.entity.Book;
import com.etc.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${spring.kafka.topic}")
private String topic;
@GetMapping("/send")
public String send() {
String msg="hello world";
kafkaTemplate.send(topic, msg);
System.out.println("生产者发送消息给topic1:"+msg);
return "生产者发送消息给topic1:"+msg;
}
}
消费者:
package com.etc.kafkaTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "${spring.kafka.topic}")
public void listen1(ConsumerRecord<?, String> record) {
System.out.println(record);
String value = record.value();
System.out.println("消费者获取消息"+value);
}
}
测试地址:
http://localhost:8091/kafka/send
|