一、TDengine数据订阅服务
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
关于TDengine数据订阅服务的详细使用文档可参考官网:TDengine—数据订阅
这里不再过多叙述,本文主要内容:在物联网背景下,实时数据监控是很常见的业务功能,面对PB级的数据量我们如何高效地获取到实时数据,TDengine 作为一款优秀的时序数据库,集成了类似Kafka消息队列一样的服务。消息队列可以起到异步解耦消峰的作用,但是通常来说数据的发送速度是远高于数据消费速度的(因为有针对业务的消费逻辑),于是乎数据堆积发生的可能性非常大,那么提高消费速度自然就是重中之重了。
二、多线程批量消费
2.0 准备
创建数据库:tmqdb
创建超级表:
CREATE TABLE meters (ts TIMESTAMP, current FLOAT, voltage INT) TAGS (groupid INT, location BINARY(16))
创建子表d0和d1:INSERT INTO d1 USING meters TAGS(1, ‘San Francisco’) values(now - 9s, 10.1, 119) INSERT INTO d0 values(now - 8s, NULL, NULL)
创建主题:create topic topic_name as select * from meters
依赖:
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
</dependency>
2.1 超级表实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
}
2.2 模拟数据插入
public class WriteData {
private static int count = 0;
public static void main(String[] args) {
TaosCrudUtils taosCrudUtils = new TaosCrudUtils();
while (count < 20000) {
Random random = new Random();
int i = random.nextInt(235);
String sql = "INSERT INTO tmqdb.d1 VALUES(now, " + (float) i + ", " + i + ");";
taosCrudUtils.insert(sql);
count++;
}
}
}
2.3 配置文件
# 服务器主机
taos.hostName=localdomain.com:6030
# 消费组
taos.groupId=test
# 主题名
taos.topicName=topic_name
2.4 消费者多线程批量消费
package com.zhmsky.springboottdengine.数据订阅.消费者多线程消费;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import com.zhmsky.springboottdengine.数据订阅.pojo.Meters;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ConsumerHandler {
private static String HOST_NAME;
private static String GROUP_ID;
private static String TOPICNAME;
private TaosConsumer<Meters> consumer;
private ExecutorService executors;
private boolean active = true;
public static Properties initConfig() {
InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
Properties fileProperties = new Properties();
try {
fileProperties.load(is);
HOST_NAME = fileProperties.getProperty("taos.hostName");
GROUP_ID = fileProperties.getProperty("taos.groupId");
TOPICNAME = fileProperties.getProperty("taos.topicName");
} catch (IOException e) {
throw new RuntimeException(e);
}
Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, HOST_NAME);
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, GROUP_ID);
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.zhmsky.springboottdengine.数据订阅.MetersDeserializer");
return properties;
}
@PostConstruct
public void initTaosConfig() {
Properties properties = initConfig();
try {
consumer = new TaosConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPICNAME));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, 20, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
while (active) {
try {
ConsumerRecords<Meters> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executors.submit(new TaskWorker(records));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
public void stopTaosPoll() {
this.active = false;
}
}
2.5 自定义线程类(处理真正的消费逻辑)
package com.zhmsky.springboottdengine.数据订阅.消费者多线程消费;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import com.zhmsky.springboottdengine.数据订阅.pojo.Meters;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ConsumerHandler {
private static String HOST_NAME;
private static String GROUP_ID;
private static String TOPICNAME;
private TaosConsumer<Meters> consumer;
private ExecutorService executors;
private boolean active = true;
public static Properties initConfig() {
InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
Properties fileProperties = new Properties();
try {
fileProperties.load(is);
HOST_NAME = fileProperties.getProperty("taos.hostName");
GROUP_ID = fileProperties.getProperty("taos.groupId");
TOPICNAME = fileProperties.getProperty("taos.topicName");
} catch (IOException e) {
throw new RuntimeException(e);
}
Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, HOST_NAME);
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, GROUP_ID);
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.zhmsky.springboottdengine.数据订阅.MetersDeserializer");
return properties;
}
@PostConstruct
public void initTaosConfig() {
Properties properties = initConfig();
try {
consumer = new TaosConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPICNAME));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, 20, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
while (active) {
try {
ConsumerRecords<Meters> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executors.submit(new TaskWorker(records));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
public void stopTaosPoll() {
this.active = false;
}
}
2.6 值解析
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
2.7 测试
@SpringBootApplication
@EnableScheduling
public class SpringbootTDengineApplication {
@Autowired
private ConsumerHandler consumers;
public static void main(String[] args) {
SpringApplication.run(SpringbootTDengineApplication.class, args);
}
@Scheduled(cron = "* 3 21 * * ? ")
public void test() {
consumers.execute(10);
}
}
|