Spring整合Kafka
- 采用Kafaka技术进行系统消息的发送,提高系统的性能
1、阻塞队列的使用
- Kafaka也好还是其他的消息中间件也好都是框架,如果不使用这些框架使用BlockingQueue也可以解决消息系统的开发
- 理解阻塞队列对理解Kafaka很有帮助
- BlockingQueue
-
解决线程通信的问题 -
阻塞的方法有put、take -
-
生产的数据放在队列当中,当队列满的时候生产者线程就被阻塞住(什么都不做,不会消耗资源),等待消费者线程进行消费 -
消费线程从队列当中拿数据进行消费,当数据非消费完了的时候,消费者线程被阻塞住,等待生产者线程进行生产 -
阻塞队列的作用就是用来在两个线程之间,避免cpu资源的消耗,能够提高系统的性能。 - 生产者消费者模式
- 实现类
- ArrayBlockingQueue
- LinkedBlockingQueue
- PrioriBlockingQueue、SynchronousQueue、DelayQueue等
- 生产者消费者例子
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费了:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、Kafka入门
-
Kafka简介
- Kafka是一个分布式的流媒体平台
- 主要应用在消息系统、日志收集、用户行为跟踪、流式处理
-
Kafka特点(性能最好的消息队列)
- 高吞吐量(对硬盘读取效率的高低,取决于读写方式的不同,对硬盘顺序读写性能很高,甚至高于对于内存的随机读取,Kafka利用对硬盘的顺序读写和硬盘存放数据多的特点既保证效率又保证了海量数据的处理)
- 消息持久化(将消息永久存放在硬盘上,处理海量数据的前提)
- 高可靠性(可以做集群部署,一台服务器挂了还有其他的服务器顶上,这样来保证他的可靠性)
- 高扩展性(集群中服务器不够用了,想要添加一个服务器很方便)
-
Kafka术语
- Broker(Kafka的服务器)、Zookeeper(能够用来管理其他的集群,Kafka的集群使用这个来进行管理)
- Topic(文件夹用来存放消息的位置)、Partition(分区),Offset(消息的索引)
-
-
下载Kafka解压之后进行一些配置
-
启动zookeeper(切换到kafka目录下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
kafka-topics.bat --list --bootstrap-server localhost:9092
kafka-console-producer.bat --broker-list localhost:9092 --topic test
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
- zookeeper关闭命令(切记使用命令行停止、否则容易出事)
zookeeper-server-stop.bat
kafka-server-stop.bat
3、Spring整合Kafaka
-
导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
-
配置kafka
# Kafka配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
#配置是否自动提交消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
#配置自动提交的频率
spring.kafka.consumer.auto-commit-interval=3000
-
访问kfka
-
测试一下(记得启动kafak和zookeeper)
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
|