|
windows 简单搭建 单机 kafka 环境
1. 下载 zookeeper? 3.7.0 /kafka_2.12-2.8.0
2. windows 环境下 启动zookeeper?
zkServer.cmd
3. windows 环境下 启动?kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
4. kafka 启动后,创建一个 topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5. topic 创建后,启动 producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test
6. topic创建后,启动 consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
7. producer 生成消息,consume消费消息

简单的 kafka windows 单机环境 就搭建好了。
其中,有很多配置 都是默认的,如果需要,则修改 配置文件就可以了。
java? 简单操作:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
生产者:
package wxj.test.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class TestProducer {
static volatile boolean send = false;
public static void main(String[] args) {
/**
* 1. 构造一个 生产者 对象
*/
/**
* 1.1 属性配置
*/
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* 1.2 构造实例
*/
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
/**
* 2. 包装 需要发送的消息
*/
ProducerRecord<String,String> record = new ProducerRecord<>("test","new message");
/**
* 3. 生产者发送消息
*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e){
System.out.println("error");
}else {
System.out.println("success");
}
send = true;
}
});
while (!send){
}
producer.close();
}
}
2. 消费者
package wxj.test.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
public class TestConsumer {
public static void main(String[] args) {
/**
* 1. 构造一个 消费者对象
*/
/**
* 1.1 属性配置
*/
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"0");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/**
* 1.2 构造实例
*/
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
/**
* 1.3 设置订阅的 topic
*/
consumer.subscribe(Arrays.asList("test"));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord<String, String> record = iterator.next();
System.out.println("has record : " + record);
}
}
}
}
查看kafka消息堆积情况
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group 0
kafka集群配置:
单机就是一个 broker,配置文件不用改
集群,主要修改配置文件:
broker.id=0
log.dirs=/tmp/kafka0-logs
listeners=PLAINTEXT://:9092
zookeeper地址要配成一样的,比如本地的话就都设置为:
zookeeper.connect=localhost:2181
|