IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> springboot整合TDengine实现数据订阅——多线程快速消费 -> 正文阅读

[Java知识库]springboot整合TDengine实现数据订阅——多线程快速消费

一、TDengine数据订阅服务

为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。

关于TDengine数据订阅服务的详细使用文档可参考官网:TDengine—数据订阅

这里不再过多叙述,本文主要内容:在物联网背景下,实时数据监控是很常见的业务功能,面对PB级的数据量我们如何高效地获取到实时数据,TDengine 作为一款优秀的时序数据库,集成了类似Kafka消息队列一样的服务。消息队列可以起到异步解耦消峰的作用,但是通常来说数据的发送速度是远高于数据消费速度的(因为有针对业务的消费逻辑),于是乎数据堆积发生的可能性非常大,那么提高消费速度自然就是重中之重了。

二、多线程批量消费

2.0 准备

创建数据库:tmqdb

创建超级表:

CREATE TABLE meters (ts TIMESTAMP, current FLOAT, voltage INT)
TAGS (groupid INT, location BINARY(16))

创建子表d0和d1:INSERT INTO d1 USING meters TAGS(1, ‘San Francisco’) values(now - 9s, 10.1, 119)
INSERT INTO d0 values(now - 8s, NULL, NULL)

创建主题:create topic topic_name as select * from meters

依赖:

    <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--  TDengine Java Connector       -->
        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbcdriver</artifactId>
            <version>3.0.0</version>
     </dependency>

2.1 超级表实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Meters {

    //动态属性
    private Timestamp ts;
    private float current;
    private int voltage;
    //静态属性
    private int groupid;
    private String location;
}

2.2 模拟数据插入


/**
 * 模拟写数据
 *
 * @author zhmsky
 * @date 2022/9/12 17:18
 */
public class WriteData {

    private static int count = 0;

    public static void main(String[] args) {
        TaosCrudUtils taosCrudUtils = new TaosCrudUtils();
        //一次性插入两万数据
        while (count < 20000) {
            Random random = new Random();
            int i = random.nextInt(235);
            String sql = "INSERT INTO tmqdb.d1 VALUES(now, " + (float) i + ", " + i + ");";
            taosCrudUtils.insert(sql);
            count++;
        }
    }

}

2.3 配置文件

#  服务器主机
taos.hostName=localdomain.com:6030
# 消费组
taos.groupId=test
# 主题名
taos.topicName=topic_name

2.4 消费者多线程批量消费

package com.zhmsky.springboottdengine.数据订阅.消费者多线程消费;

import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import com.zhmsky.springboottdengine.数据订阅.pojo.Meters;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author zhmsky
 * @date 2022/9/12 19:58
 */
@Component
public class ConsumerHandler {

    private static String HOST_NAME;
    private static String GROUP_ID;
    private static String TOPICNAME;

    private TaosConsumer<Meters> consumer;
    private ExecutorService executors;

    //消息队列消息拉取是否开启
    private boolean active = true;

    public static Properties initConfig() {
        //获取配置文件
        InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
        Properties fileProperties = new Properties();
        try {
            //读取配置文件
            fileProperties.load(is);
            HOST_NAME = fileProperties.getProperty("taos.hostName");
            GROUP_ID = fileProperties.getProperty("taos.groupId");
            TOPICNAME = fileProperties.getProperty("taos.topicName");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        //消费者配置
        Properties properties = new Properties();
        //连接地址
        properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, HOST_NAME);
        //允许从消息中解析表名
        properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
        //开启自动提交
        properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
        properties.setProperty(TMQConstants.GROUP_ID, GROUP_ID);
        //值解析方法,需要实现com.taosdata.jdbc.tmq.Deserializer 接口或继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类
        properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
                "com.zhmsky.springboottdengine.数据订阅.MetersDeserializer");
        return properties;
    }

    /**
     * 项目启动时完成初始化配置
     */
    @PostConstruct
    public void initTaosConfig() {
        Properties properties = initConfig();
        try {
            //创建消费者实例
            consumer = new TaosConsumer<>(properties);
            //订阅主题
            consumer.subscribe(Collections.singletonList(TOPICNAME));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

    }

    /**
     * 多线程批量消费(执行这个方法即可循环拉取消息)
     *
     * @param workerNum
     */
    public void execute(int workerNum) {
        executors = new ThreadPoolExecutor(workerNum, 20, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
        while (active) {
            try {
                ConsumerRecords<Meters> records = consumer.poll(Duration.ofMillis(100));
                if (!records.isEmpty()) {
                    //将消息交给线程池认领
                    executors.submit(new TaskWorker(records));
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * 停止拉取消息
     */
    public void stopTaosPoll() {
        this.active = false;
    }
}

2.5 自定义线程类(处理真正的消费逻辑)

package com.zhmsky.springboottdengine.数据订阅.消费者多线程消费;

import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import com.zhmsky.springboottdengine.数据订阅.pojo.Meters;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author zhmsky
 * @date 2022/9/12 19:58
 */
@Component
public class ConsumerHandler {

    private static String HOST_NAME;
    private static String GROUP_ID;
    private static String TOPICNAME;

    private TaosConsumer<Meters> consumer;
    private ExecutorService executors;

    //消息队列消息拉取是否开启
    private boolean active = true;

    public static Properties initConfig() {
        //获取配置文件
        InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
        Properties fileProperties = new Properties();
        try {
            //读取配置文件
            fileProperties.load(is);
            HOST_NAME = fileProperties.getProperty("taos.hostName");
            GROUP_ID = fileProperties.getProperty("taos.groupId");
            TOPICNAME = fileProperties.getProperty("taos.topicName");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        //消费者配置
        Properties properties = new Properties();
        properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, HOST_NAME);
        properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
        properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
        properties.setProperty(TMQConstants.GROUP_ID, GROUP_ID);
        properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
                "com.zhmsky.springboottdengine.数据订阅.MetersDeserializer");
        return properties;
    }

    /**
     * 项目启动时完成初始化配置
     */
    @PostConstruct
    public void initTaosConfig() {
        Properties properties = initConfig();
        try {
            //创建消费者实例
            consumer = new TaosConsumer<>(properties);
            //订阅主题
            consumer.subscribe(Collections.singletonList(TOPICNAME));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

    }

    /**
     * 多线程批量消费(执行这个方法即可循环拉取消息)
     *
     * @param workerNum 核心线程数
     */
    public void execute(int workerNum) {
        executors = new ThreadPoolExecutor(workerNum, 20, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
        while (active) {
            try {
                ConsumerRecords<Meters> records = consumer.poll(Duration.ofMillis(100));
                if (!records.isEmpty()) {
                    //将消息交给线程池认领
                    executors.submit(new TaskWorker(records));
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * 停止拉取消息
     */
    public void stopTaosPoll() {
        this.active = false;
    }
}

2.6 值解析

/**
 * 值解析方法
 *
 * @author zhmsky
 * @date 2022/9/12 16:43
 */
public class MetersDeserializer extends ReferenceDeserializer<Meters> {

}

2.7 测试

@SpringBootApplication
@EnableScheduling
public class SpringbootTDengineApplication {
    @Autowired
    private ConsumerHandler consumers;

    public static void main(String[] args) {
        SpringApplication.run(SpringbootTDengineApplication.class, args);
    }

    //定时任务启动
    @Scheduled(cron = "* 3 21 * * ? ")
    public void test() {
        consumers.execute(10);
    }

}

在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-09-15 01:51:23  更:2022-09-15 01:52:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:12:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码