IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka中间件 -> 正文阅读

[大数据]Kafka中间件

application.yml的配置文件

kafka:
  consumerTopic: Java_Pic
  consumerConfig: kafka_consumer.properties
  consumerCommit: true
  producerTopic: Java_Pic
  producerConfig: kafka_producer.properties
  producerEnable: true

消费者配置文件

group.id=GP_STRUCT_K2GBCL_01
bootstrap.servers=10.20
zookeeper.connect=10
enable.auto.commit=false
auto.commit.interval.ms=1000
auto.offset.reset=earliest
max.poll.records=100
session.timeout.ms=100000
max.request.size=104857600

生产者配置文件

bootstrap.servers=
acks=1
max.request.size=104857600
buffer.memory=104857600
retries=3

KafkaConfig

@Component
@ConfigurationProperties(prefix = "kafka")
public class KafkaConfig {
    //消费TOPIC
    private String consumerTopic;
    //消费者相关配置文件路径
    private String consumerConfig;
    //消费者是否提交偏移量
    private boolean consumerCommit;
    //生产者生产TOPIC
    private String producerTopic;
    //生产者相关配置文件路径
    private String producerConfig;
    //生产者是否生产开关
    private boolean producerEnable;

    public String getConsumerTopic() {
        return consumerTopic;
    }

    public void setConsumerTopic(String consumerTopic) {
        this.consumerTopic = consumerTopic;
    }

    public String getConsumerConfig() {
        return consumerConfig;
    }

    public void setConsumerConfig(String consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    public boolean isConsumerCommit() {
        return consumerCommit;
    }

    public void setConsumerCommit(boolean consumerCommit) {
        this.consumerCommit = consumerCommit;
    }

    public String getProducerTopic() {
        return producerTopic;
    }

    public void setProducerTopic(String producerTopic) {
        this.producerTopic = producerTopic;
    }

    public String getProducerConfig() {
        return producerConfig;
    }

    public void setProducerConfig(String producerConfig) {
        this.producerConfig = producerConfig;
    }

    public boolean isProducerEnable() {
        return producerEnable;
    }

    public void setProducerEnable(boolean producerEnable) {
        this.producerEnable = producerEnable;
    }

    /**
    *@Description 将给定topic列表字符串拆解成列表
    *@Param [topics] topic列表字符串,中间逗号分隔
    *@Return java.util.List<java.lang.String> topic列表
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    public static List<String> getTopicLists(String topics) {
        String[] items = topics.split(",");
        List<String> result = new LinkedList<>();
        for (String item : items) {
            if (!StringUtils.isBlank(item)) {
                result.add(StringUtils.trim(item));
            }
        }
        return result;
    }
}

kafka的服务类实现

package cn.com.wind.fm.day7.service;

import cn.com.wind.fm.day7.config.KafkaConfig;
import cn.com.wind.fm.day7.util.PropertyFileUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;

@Service
public class KafkaServiceImpl implements KafkaService {
    private static Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);
    @Autowired
    private KafkaConfig config;

    private volatile boolean reqClose;
    private volatile boolean closed;

    private Consumer<String, String> messageConsumer;
    private Producer<String, String> messageProducer;

    /**
    *@Description Kafka集群的初始化  包含加载配置文件,消费者、生产者的初始化
    *@Param []
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    @Override
    public void init() {
        LOGGER.info("初始化生产者..");
        if (messageProducer == null && config.isProducerEnable()) {
            messageProducer = new KafkaProducer<>(PropertyFileUtil.load(config.getProducerConfig()),
                    new StringSerializer(), new StringSerializer());
        }


        LOGGER.info("初始化消费者..");
        if (messageConsumer == null) {
            messageConsumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                    new StringDeserializer(), new StringDeserializer());
            messageConsumer.subscribe(config.getTopicLists(config.getConsumerTopic()));
        }
    }

    /**
    *@Description Kafka消息发送方式
    *@Param [key, value] 消息的key 消息的value
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    @Override
    public void produce(String key, String value) {
        //首先判断如果value为空则跳过
        if(StringUtils.isBlank(value)) {
            return;
        }
        RecordMetadata recordMetadata = null;
        if (messageProducer != null) {
            String topic = config.getProducerTopic();

            //1. 发送后不理会发送结果
            messageProducer.send(new ProducerRecord<String, String>(topic, key, value));

            //2. 同步发送(阻塞)
            Future<RecordMetadata> future = messageProducer.send(new ProducerRecord<String, String>(topic, key, value));
            try {
                recordMetadata = future.get();
            } catch (Exception e) {
                LOGGER.error("生产发送错误:",e);
            }
            //可以从返回的原信息中获得很多信息
            long offset = recordMetadata.offset();
            int partition = recordMetadata.partition();
            LOGGER.info(partition + "_" + offset);
            
            //3. 异步回调官方案例 (不阻塞)
            // JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制
            // RecordMetadata 和 Exception 不可能同时为空,
            // 消息发送成功时,Exception为null,消息发送失败时,metadata为空
            messageProducer.send(
                    new ProducerRecord<String, String>(topic, key, value),
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception ex) {
                            if (ex != null) {
                            	LOGGER.error("生产发送错误:",ex);
                            } else {
                                // 发送成功
                            	LOGGER.info(metadata.partition() + "_" + metadata.offset());
                            }
                        }
                    });
            LOGGER.info("send方法执行完毕");
        }
    }

    /**
    *@Description 消费者消费消息方法
    *@Param []
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    @Override
    public void consume() {
        //计数器
        long retry = 0;
        while (!reqClose) {
            try {
                //此处的poll作用为拉取消息
                ConsumerRecords<String, String> records = messageConsumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    if (reqClose) {
                        LOGGER.info("跳出消費..");
                        break;
                    }
                    String key =  record.key();
                    String value = record.value();

                }
                if (config.isConsumerCommit()) {
                    messageConsumer.commitSync();
                }
            } catch (Exception ex) {
                if (retry % 60 == 0) {
                    // 防止Kafka失联时,错误过多,控制1分钟出一次日志
                    LOGGER.error("Kafka Operation Failed", ex);
                    retry++;
                }
            }
        }
        // 释放资源
        this.close();
    }

    /**
    *@Description 处理消息的业务方法 暂时为空
    *@Param [msg] kafka消息
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    public void consumeRecord(String key,String msg) {
        LOGGER.info("接收到kafka消息,key:{},value:{}",key,msg);
    }

    
    /**
    *@Description 重置消费者偏移量
    *@Param []
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/13
    */
    @Override
    public void resetOffset() {
        Consumer<String, String> consumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                new StringDeserializer(), new StringDeserializer());
        List<String> topics = config.getTopicLists(config.getConsumerTopic());
        for (String topic : topics) {
            consumer.subscribe(Arrays.asList(topic));
            ConsumerRecords<String, String> records = consumer.poll(2000);
            
            Set<TopicPartition> topicList = consumer.assignment();
            Map<TopicPartition, Long> endMap = consumer.endOffsets(topicList);
            Map<TopicPartition, Long> beginmap = consumer.beginningOffsets(topicList);
            
            long singleTpLagSize = 1300000;
            for (TopicPartition tp : topicList) {
                long endOffset = endMap.get(tp);
                long beginOffset = beginmap.get(tp);
                long aimOffset = endOffset - singleTpLagSize;
                if (aimOffset > 0 && aimOffset >= beginOffset) {
                    consumer.seek(tp, endOffset-singleTpLagSize);
                } else {
                    consumer.seek(tp, beginOffset);
                }
            }
            consumer.commitSync();
        }
    }

    public boolean isClosed() {
        return closed;
    }

    private void close() {
        try {
            if (messageConsumer != null) {
                messageConsumer.close();
            }
            if (messageProducer != null) {
                messageProducer.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        closed = true;
        LOGGER.info("Kafka資源釋放完畢!");
    }

    public void reqClose() {
        this.reqClose = true;
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:49:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 9:52:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码