prometheus启动参数 --enable-feature=remote-write-receiver
准备
-
remote.proto -
types.proto -
gogo.proto -
protoc-3.11.2-PLATFORM.zip -
./protoc/bin/protoc --proto_path=./imports --java_out=./java_output/ imports/types.proto -
./protoc/bin/protoc --proto_path=./imports --java_out=./java_output/ imports/remote.proto -
./protoc/bin/protoc --proto_path=./imports --java_out=./java_output/ imports/gogoproto/gogo.prot
迁移到springboot,为了方便,不做修改直接迁移到 src/main/java 目录下
调用api/v1/write
- 添加依赖 pom.xml
<!-- prometheus remote write 依赖 -->
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.8.4</version>
</dependency>
- metrics.config.TasksConfig
package ...
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Data
@Component
@ConfigurationProperties(prefix = "tasks")
@Slf4j
public class TasksConfig {
List<Item> items;
@Data
public static class Item {
private String metric;
private Map<String, String> labels;
// ... 其他apollo配置
}
}
- metrics.bean.bo.MetricPoint
package ...;
import ...metrics.config.TasksConfig;
import lombok.Data;
import java.util.Map;
@Data
public class MetricPoint {
// private String metric; // 指标名称
private Map<String, String> tags; // 数据标签
private long time = 0; // 时间戳,单位是秒
private double value;
private TasksConfig.Item conf; // conf.getTags() 数据标签
}
- PrometheusService
package ...;
import ...metrics.bean.bo.MetricPoint;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.xerial.snappy.Snappy;
import prometheus.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Service
@Slf4j
public class PrometheusService {
@Value("${prometheus.remoteWriteUrl:http://192.168.50.101:9090/}")
private String remoteWriteUrl;
private final CloseableHttpClient httpClient = HttpClients.createSystem();
public void remoteWrite(MetricPoint metricPoint) {
remoteWrite(Lists.newArrayList(metricPoint));
}
public void remoteWrite(List<MetricPoint> metricPointList) {
if (CollectionUtils.isEmpty(metricPointList)) {
return;
}
List<Types.TimeSeries> timeSeriesList = createTimeSeries(metricPointList);
prometheus.Remote.WriteRequest.Builder writeRequestBuilder = prometheus.Remote.WriteRequest.newBuilder();
// prometheus.Remote.WriteRequest writeRequest= writeRequestBuilder.addMetadata(Types.MetricMetadata.newBuilder().setTypeValue(Types.MetricMetadata.MetricType.HISTOGRAM_VALUE)).addAllTimeseries(timeSeriesList).build();
prometheus.Remote.WriteRequest writeRequest = writeRequestBuilder.addAllTimeseries(timeSeriesList).build();
try {
byte[] compressed = Snappy.compress(writeRequest.toByteArray());
HttpPost httpPost = new HttpPost(remoteWriteUrl + "api/v1/write");
httpPost.setHeader("Content-type", "application/x-www-form-urlencoded");
httpPost.setHeader("Content-Encoding", "snappy");
httpPost.setHeader("X-Prometheus-Remote-Write-Version", "0.1.0");
ByteArrayEntity byteArrayEntity = new ByteArrayEntity(compressed);
httpPost.getRequestLine();
httpPost.setEntity(byteArrayEntity);
CloseableHttpResponse response = httpClient.execute(httpPost);
if (response.getStatusLine().getStatusCode() > 299) {
log.error("prometheus remoteWrite response error, data: [{}], response: [{}]", metricPointList, response);
}
} catch (Exception e) {
log.error("prometheus remoteWrite error, data: [{}]", metricPointList, e);
}
}
private List<Types.TimeSeries> createTimeSeries(List<MetricPoint> metricPointList) {
List<Types.TimeSeries> timeSeriesList = new ArrayList<>();
for (MetricPoint point : metricPointList) {
Types.TimeSeries.Builder timeSeriesBuilder = Types.TimeSeries.newBuilder();
Types.Sample.Builder sampleBuilder = Types.Sample.newBuilder();
Types.Label metricNameLabel = Types.Label.newBuilder().setName("__name__").setValue(point.getConf().getMetric()).build();
timeSeriesBuilder.addLabels(metricNameLabel);
if (Objects.nonNull(point.getConf().getLabels()) && point.getConf().getLabels().size() > 0) {
for (Map.Entry<String, String> entry : point.getConf().getLabels().entrySet()) {
Types.Label labelTmp = Types.Label.newBuilder().setName(entry.getKey()).setValue(entry.getValue()).build();
timeSeriesBuilder.addLabels(labelTmp);
}
}
if (Objects.nonNull(point.getTags()) && point.getTags().size() > 0) {
for (Map.Entry<String, String> entry : point.getTags().entrySet()) {
Types.Label labelTmp = Types.Label.newBuilder().setName(entry.getKey()).setValue(entry.getValue()).build();
timeSeriesBuilder.addLabels(labelTmp);
}
}
sampleBuilder.setValue(point.getValue());
long timestamp = point.getTime() > 0 ? point.getTime() : System.currentTimeMillis();
sampleBuilder.setTimestamp(timestamp);
timeSeriesBuilder.addSamples(sampleBuilder.build());
timeSeriesList.add(timeSeriesBuilder.build());
}
return timeSeriesList;
}
}
- 测试
package ...;
import lombok.extern.slf4j.Slf4j;
import ...metrics.bean.bo.MetricPoint;
import ...metrics.config.TasksConfig;
import ...metrics.service.PrometheusService;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ActiveProfiles("dev")
@SpringBootTest
@ContextConfiguration(classes = BusinessMetricsApplication.class)
@Slf4j
public class PrometheusTest {
@Autowired
private PrometheusService prometheusService;
@Test
public void reportTest() throws IOException {
Map<String, String> tags = new HashMap<>();
tags.put("a", "b1");
tags.put("c", "d1");
MetricPoint mp = new MetricPoint();
TasksConfig.Item conf = new TasksConfig.Item();
conf.setLabels(tags);
conf.setMetric("metric_testAB");
mp.setTime(System.currentTimeMillis() - 60*1000);
mp.setValue(56);
mp.setConf(conf);
Map<String, String> tags2 = new HashMap<>();
tags2.put("aa", "bb1");
tags2.put("cc", "dd1");
MetricPoint mp2 = new MetricPoint();
TasksConfig.Item conf2 = new TasksConfig.Item();
conf2.setLabels(tags2);
conf2.setMetric("metric_testAB");
mp2.setValue(150);
mp2.setConf(conf2);
prometheusService.remoteWrite(Lists.newArrayList(mp, mp2));
}
}
- 结果
备注
- 提一次提交时间戳为timestamp=1 第二次 timestamp=4 第三次timestamp=3 则prometheus会报错 out of order samples
- 删除metric重新提交 timestamp=3 同样会报错 out of order samples
参考
|