IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot如何集成kafka集群 -> 正文阅读

[大数据]springboot如何集成kafka集群

springboot中集成kafka,主要目的干啥呢,当然消息推送啦。不同系统之间,自身系统不同组件之间消息通信的一种方式,也可以是使用MQ。

为什么要使用咱们的消息系统呢:个人看来,目的主要就是为了解耦,异步通信,消峰处理。

消息系统三大优点

解耦:

怎么理解呢,比如我是A系统,我要现在要给B、C两个系统发送消息,如果不用消息系统,直接调用,就相当于A系统跟B、C系统强耦合到一起了,如果后面还有D、E......等系统怎么办呢,我总不能挨着挨着一个一个写吧,这样代码耦合太高了,而且我还得考虑别人收到没有,处理成功失败等等情况。那我使用消息系统不就解决这个问题了嘛,我直管向Kafka推送消息,我才不管你谁来消费呢,你想消费你就去消费Kafka消息。这不就解耦了嘛。

异步:

系统A调用其他系统接口的时候,需要一直等待其他系统处理完成它的业务逻辑后,返回处理结果,我才能继续处理我的业务逻辑,但是它的业务逻辑我其实不关心,我没必要等啊。

那用异步不就好了吗,我将消息发给Kafka,你其他系统去消费就行了,我监听你返回的处理结果就行了,我发送完后就可以继续做其他操作了,不用一直等着。

消峰:

比如电商在秒杀的时候,用户量暴增,但是它又只是一小段时间的爆发量。如果不处理,那服务器不得直接挂了。怎么办呢?那我们用Kafka啊,来了请求我就推送消息到Kafka,然后呢,消费设置一秒消费多少就好,然后等高峰过去,用户量回归正常,积累的消息也会慢慢的被消费完。这样咱不是又能愉快的玩耍了嘛。

Kafka术语:

Producer:生产者,消息的生产者,负责推送消息到Kafka队列

Consumer:消费者,负责消费Kafka队列中的消息

Consumer group:用来实现消息广播(发给多个Consumer)或单播(发给单个Consumer)的手段

Offset:kafka存储消息的偏移量,可以理解为下标用来控制消息的消费位置

Broker:Kafka服务节点,一个kafka服务器就是一个Broker,一个集群由多个Broker组成,一个? ? ? ? ? ? ? ? ? Broker下可以有多个Topic

Topic:消息的类别、标题,可以理解为是一个消息的队列

Partition:属于Topic的子集,消息分区,一个Topic可以有多个partition,一个partition在物理上对? ? ? ? ? ? ? ? ? ?应了一个文件夹;partition中的所有消息都会分配一个offset

Kafka消息模式:

点对点消息传递模式:

一个消息推送出去,只能被消费一次,任何一个消费者消费了该消息后,其他消费者都不能继续消费该消息

发布-订阅模式:

消息发布到队列,所有订阅了topic的的消费者都可以消费topic里的所有消息。

话不多说,直接开始:

1.pom文件中添加maven引用

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.yml文件中增加kafka的配置

spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers:
      - 127.0.0.1:9092
      - 127.0.0.1:9093
      - 127.0.0.1:9094
    # kafka生产者配置
    producer:
      # 写入失败时,重试次数。当leader失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
      # 每次批量发送消息的数量,produce积累到一定数据,一次发数据量
      # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。
      # 这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置(16K)
      batch-size: 16384
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
      buffer-memory: 33554432
      #默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,
      #这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
      #如果不开启压缩,可设置为none(默认值),比较大的消息可开启
      compressionType: none
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,
      #    无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后
      #    立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,
      #    这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 连接超时时间
      properties:
        request.timeout.ms: 30000

3.配置生产者Producer或者消费者Consumer

首先当然是生产者了,发送消息Producer:

package com.liu.kafka;

import com.liu.constants.KafkaConstants;
import com.liu.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.*;
import java.util.concurrent.ExecutionException;

import static com.alibaba.fastjson.JSON.toJSONString;

/**
  * kafka生产者类
  * @author kevin
  * @date 2021/7/28
  */
@Component
@Slf4j
@SuppressWarnings({"unused"})
public class ProducerUtils {

    private static final String PUSH_MSG_LOG = "准备发送消息为:{}";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private KafkaAdminClient kafkaAdminClient;

    /**
      * 如果没有topic,则创建一个
      * @author kevin
      * @param topicName :
      * @param partitionNum :
      * @param replicaNum :
      * @return org.apache.kafka.clients.admin.CreateTopicsResult
      * @date 2021/8/5 9:42
      */
    public Boolean createTopic(String topicName, int partitionNum, int replicaNum){
        KafkaFuture<Set<String>> topics = kafkaAdminClient.listTopics().names();
        try {
            if (topics.get().contains(topicName)) {
                return true;
            }
            NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) replicaNum);
            kafkaAdminClient.createTopics(Collections.singleton(newTopic));
            return true;
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /**
      * 传入topic名称,json格式字符串的消息,生产者进行发送
      * @author kevin
      * @param topicName : topic名称
      * @param jsonStr : 消息json字符串
      * @return boolean : 推送是否成功
      * @date 2021/7/28 15:53
      */
    public boolean sendMessage(String topicName, String jsonStr) {
        createTopic(topicName, 5, 5);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,json格式字符串数组的消息,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param jsonStrs : 消息json字符串数组
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public Boolean[] sendMessage(String topicName, String[] jsonStrs) {
        createTopic(topicName, 5, 5);
        int msgLength = jsonStrs.length;
        Boolean[] success = new Boolean[msgLength];
        for (int i = 0; i < msgLength; i++) {
            String jsonStr = jsonStrs[i];
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,消息对象,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param obj : 消息对象
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public boolean sendMessage(String topicName, Object obj) {
        createTopic(topicName, 5, 5);
        String jsonStr = toJSONString(obj);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,消息对象数组,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param list : 消息对象数组
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:56
     */
    public Boolean[] sendMessage(String topicName, List<Object> list) {
        createTopic(topicName, 5, 5);
        Boolean[] success = new Boolean[list.size()];
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            String jsonStr = toJSONString(obj);
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,json格式字符串的消息,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param key : 消息key
     * @param jsonStr : 消息json字符串
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public boolean sendMessage(String topicName, String key, String jsonStr) {
        createTopic(topicName, 5, 5);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,json格式字符串数组的消息,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param key : 消息key
     * @param jsonStrs : 消息json字符串数组
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public Boolean[] sendMessage(String topicName, String key, String[] jsonStrs) {
        createTopic(topicName, 5, 5);
        int msgLength = jsonStrs.length;
        Boolean[] success = new Boolean[msgLength];
        for (int i = 0; i < msgLength; i++) {
            String jsonStr = jsonStrs[i];
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,消息对象,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param key : 消息key
     * @param obj : 消息对象
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public boolean sendMessage(String topicName, String key, Object obj) {
        createTopic(topicName, 5, 5);
        String jsonStr = toJSONString(obj);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,消息对象数组,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param key : 消息key
     * @param list : 消息对象数组
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:56
     */
    public Boolean[] sendMessage(String topicName, String key, List<Object> list) {
        createTopic(topicName, 5, 5);
        Boolean[] success = new Boolean[list.size()];
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            String jsonStr = toJSONString(obj);
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,json格式字符串的消息,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param jsonStr : 消息json字符串
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public boolean sendMessage(String topicName, int partition, String key, String jsonStr) {
        createTopic(topicName, 5, 5);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                partition, key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,json格式字符串数组的消息,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param jsonStrs : 消息json字符串数组
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public Boolean[] sendMessage(String topicName, int partition, String key, String[] jsonStrs) {
        createTopic(topicName, 5, 5);
        int msgLength = jsonStrs.length;
        Boolean[] success = new Boolean[msgLength];
        for (int i = 0; i < msgLength; i++) {
            String jsonStr = jsonStrs[i];
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    partition, key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,消息对象,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param obj : 消息对象
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:53
     */
    public boolean sendMessage(String topicName, int partition, String key, Object obj) {
        createTopic(topicName, 5, 5);
        String jsonStr = toJSONString(obj);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                partition, key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,消息对象数组,生产者进行发送
     * @author kevin
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param list : 消息对象数组
     * @return boolean : 推送是否成功
     * @date 2021/7/28 15:56
     */
    public Boolean[] sendMessage(String topicName, int partition, String key, List<Object> list) {
        createTopic(topicName, 5, 5);
        Boolean[] success = new Boolean[list.size()];
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            String jsonStr = toJSONString(obj);
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(
                    topicName, partition, key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
      * 处理消息推送结果
      * @author kevin
      * @param future :
      * @return boolean
      * @date 2021/7/28 15:56
      */
    private boolean dealSendResult(ListenableFuture<SendResult<String, Object>> future) {
        final boolean[] success = {false};
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(KafkaConstants.TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
                success[0] = false;
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(KafkaConstants.TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
                success[0] = true;
            }
        });
        return success[0];
    }
}

此处生产者是异步发送消息,不用等待消息发送完成。

接下来就是消费者Consumer了:

package com.liu.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import javax.transaction.Transactional;
import java.util.List;
import java.util.Optional;

/**
  * kafka消费者类
  * @author kevin
  * @date 2021/7/28
  */
@Component
@Slf4j
public class ConsumerUtils {

    @Autowired
    private RedisUtils redisUtils;

    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(5);
        factory.getContainerProperties().setPollTimeout(1000);
        factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
        return factory;
    }

    /**
      * 单条的消费kafka消息
      * @author kevin
      * @param record : 消息记录
      * @param ack : ack回调确认
      * @return void :
      * @date 2021/8/3 15:14
      */
    @KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
            @TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"0" ,"2" ,"4"}),
    }, groupId = KafkaConstants.TOPIC_GROUP1)
    public void topicTest(ConsumerRecord<String, String> record, Acknowledgment ack) {

        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
            ack.acknowledge();//手动提交offset
        }
    }

    /**
      * 批量的消费kafka消息,要配合containerFactory使用,配置的bean见batchFactory
      * @author kevin
      * @param records : 消息记录列表
      * @param ack : ack回调确认
      * @return void :
      * @date 2021/8/3 15:15
      */
    @Transactional(rollbackOn = Exception.class)
    @KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
            @TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"1", "3"}),
    }, groupId = KafkaConstants.TOPIC_GROUP2, containerFactory="batchFactory")
    public void topicTest2(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        try {
            for (ConsumerRecord<String, String> record : records) {
                //取到消息后,先查询缓存中是否已经存在,存在表示不需要再次处理
                //如果消息不存在,业务处理完成后将消息存入redis;如果消息存在直接跳过;这样可防止重复消费
                boolean isExists = redisUtils.hasKey(record.topic() + record.partition() + record.key());
                if (!isExists) {
                    Optional<String> message = Optional.ofNullable(record.value());
                    if (message.isPresent()) {
                        Object msg = message.get();
                        log.info("topic_test1 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
                    }
                    redisUtils.set(record.topic() + record.partition() + record.key(), record.value());
                }
            }
            ack.acknowledge();//手动提交offset
        }catch (Exception e){
            log.error(e.getMessage());
            throw e;
        }
    }
}

消费者,写了两种模式,批量获取消息与单条获取消息,获取消息的时候,指定topic,partition。通过KafkaListener监听来消费消息。

4.使用

接下来当然就是使用了,直接定义一个接口,然后访问接口,在接口中调用Producer发送消息,就可在Consumer消费者中监听获取到消息。

@ApiOperation(value = "测试1", notes = "test2")
    @GetMapping(value = "/test2")
    public ResponseVo test2(String value, String key, Integer partition){
        if(StringUtils.isBlank(value)){
            return new ResponseVo.Builder().error().message("请从传入发送的消息!").build();
        }
        Message message = new Message.Builder().id(UuidUtil.getUuid32()).msg(value).sendTime(DateUtils.nowDate()).build();
        String str = JSONObject.toJSONString(message);
        if(StringUtils.isNotBlank(key)){
            if(partition != null){
                producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, partition, key, str);
            }else{
                producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, key, str);
            }
        }else {
            producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, str);
        }
        return null;
    }

5.测试验证

打开swagger页面,找到接口,录入参数:

点击execute执行测试。结果如下图:

?

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:09:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 15:57:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码