IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 那些中间件 -> 正文阅读

[大数据]那些中间件

windows 简单搭建 单机 kafka 环境

1. 下载 zookeeper? 3.7.0 /kafka_2.12-2.8.0

2. windows 环境下 启动zookeeper?

zkServer.cmd

3. windows 环境下 启动?kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

4. kafka 启动后,创建一个 topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. topic 创建后,启动 producer

kafka-console-producer.bat --broker-list localhost:9092 --topic test

6. topic创建后,启动 consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

7. producer 生成消息,consume消费消息

简单的 kafka windows 单机环境 就搭建好了。

其中,有很多配置 都是默认的,如果需要,则修改 配置文件就可以了。

java? 简单操作:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>

生产者:

package wxj.test.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TestProducer {

    static volatile boolean send = false;

    public static void main(String[] args) {
        /**
         * 1. 构造一个 生产者 对象
         */

        /**
         * 1.1 属性配置
         */
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 1.2 构造实例
         */
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        /**
         * 2. 包装 需要发送的消息
         */
        ProducerRecord<String,String> record = new ProducerRecord<>("test","new message");


        /**
         * 3. 生产者发送消息
         */
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null != e){
                    System.out.println("error");
                }else {
                    System.out.println("success");
                }
                send = true;
            }
        });
        while (!send){
        }
        producer.close();

    }
}

2. 消费者

package wxj.test.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;

public class TestConsumer {


    public static void main(String[] args) {
        /**
         * 1. 构造一个 消费者对象
         */

        /**
         * 1.1 属性配置
         */
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"0");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        /**
         * 1.2 构造实例
         */
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        /**
         * 1.3 设置订阅的 topic
         */
        consumer.subscribe(Arrays.asList("test"));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            while (iterator.hasNext()){
                ConsumerRecord<String, String> record = iterator.next();
                System.out.println("has record : " + record);
            }
        }

    }
}

查看kafka消息堆积情况

kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group 0

kafka集群配置:

单机就是一个 broker,配置文件不用改

集群,主要修改配置文件:

broker.id=0

log.dirs=/tmp/kafka0-logs

listeners=PLAINTEXT://:9092

zookeeper地址要配成一样的,比如本地的话就都设置为:

zookeeper.connect=localhost:2181

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:09:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 15:49:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码