来源1
1 docker elk 搭建
1.1 elk拉取
docker pull sebp/elk
1.2 加大max_map_count
vi /etc/sysctl.conf
--增加
vm.max_map_count=655360
--保存
--退出执行
sysctl -p
1.3 启动
docker run --name elk -d -e LOGSTASH_START=0 -e ES_HEAP_SIZE="1g" -p 5601:5601 -p 9200:9200 -p 5044:5044 sebp/elk
1.4 查看kibana页面
127.0.0.1:5601
2 logstash 配置
docker exec -it 【容器id】 bash
cd /opt/logstash
mkdir conf.d
//创建配置文件
vim ./conf.d/logstash.conf
2.1 logstash file input 配置
快速上手
input{
file{
path => "/opt/logs/web-info-2021-11-25.0.log"
}
}
模版
input{
file{
#path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
path => [‘pathA’,‘pathB’]
#表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
discover_interval => 15
#排除那些文件,也就是不去读取那些文件
exclude => [‘fileName1’,‘fileNmae2’]
#被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
close_older => 3600
#在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
ignore_older => 86400
#logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
stat_interval => 1
#sincedb记录数据上一次的读取位置的一个index
sincedb_path => ’$HOME/. sincedb‘
#logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
start_position => ‘beginning’
#注意:这里需要提醒大家的是,如果你需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
}
}
2.1 logstash jdbc input 配置
input {
jdbc {
jdbc_driver_library => "/opt/logstash/conf.d/db2jcc4-4.23.42.jar"
jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "jdbc:db2://10.20.40.40:50000/P962_BW:currentSchema=DURP_CONFIG;"
jdbc_user => "db2inst1"
jdbc_password => "db2inst1"
statement => "select user_log_id, user_log_time, user_log_user_id, user_log_user_name, user_log_module, user_log_opt_result from RECEIVE_USER_LOG where user_log_time > :sql_last_value order by USER_LOG_TIME"
schedule => "* * * * *"
type => "jdbc"
use_column_value => true
tracking_column => user_log_time
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/opt/logstash/conf.d/track_time.txt"
clean_run => false
jdbc_paging_enabled => true
jdbc_page_size => 1000
}
}
filter {
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-userlog-%{+YYYY.MM.dd}"
}
}
jdbc_driver_library:db2的jar地址
2.2 保存退出,启动logstash
./bin/logstash -f ./conf.d/userlog.conf
多input output 例子
3 查看kibana dicover
首先需要进入index manager创建索引
4 springboot集成es
4.1 pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
4.2 application.xml
spring:
elasticsearch:
rest:
uris: 10.211.55.4:9200
4.3 api
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public JSonListByPageVo getAllByPage(String keyword, Object from, Object to, String modelType, int currPage, int pageSize) throws IOException {
currPage = currPage - 1;
//索引
SearchRequest searchRequest = new SearchRequest("logstash*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//检索条件
QueryBuilder builder;
if ("".equals(keyword)) {
builder = QueryBuilders.matchAllQuery();
} else if (keyword.contains("id:")) {
builder = QueryBuilders.termQuery("user_log_id", keyword.replace("id:", ""));
} else {
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(("*" + keyword + "*"), "user_log_user_name", "user_log_module", "user_log_opt_result");
builder = QueryBuilders.boolQuery().must(multiMatchQueryBuilder);
}
sourceBuilder.query(builder);
//模块类型
if (!"".equals(modelType)) {
MatchQueryBuilder queryBuilder = QueryBuilders.matchQuery("user_log_module", modelType);
sourceBuilder.query(QueryBuilders.boolQuery().must(queryBuilder));
}
//日期区间
if (!"".equals(from.toString()) && !"".equals(to.toString())) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("@timestamp").from(from).to(to);
sourceBuilder.query(QueryBuilders.boolQuery().must(rangeQueryBuilder));
}
// 分页
sourceBuilder.from(currPage * pageSize);
sourceBuilder.size(pageSize);
searchRequest.source(sourceBuilder);
SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//返回值封装
JSonListByPageVo jSonListByPageVo = new JSonListByPageVo();
jSonListByPageVo.setDataList(Arrays.stream(search.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(Collectors.toList()));
jSonListByPageVo.setTotalNum(search.getHits().getTotalHits().value);
jSonListByPageVo.setTotalPage((search.getHits().getTotalHits().value + (pageSize - 1)) / pageSize);
return jSonListByPageVo;
}
@Override
public List<String> getModel() throws IOException {
//选择索引
SearchRequest searchRequest = new SearchRequest("logstash*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//分组查询user_log_module
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_model").field("user_log_module.keyword");
sourceBuilder.aggregation(termsAggregationBuilder);
searchRequest.source(sourceBuilder);
//检索
SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//解析
Aggregations aggregations = search.getAggregations();
Terms brandName = aggregations.get("group_model");
return brandName.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKeyAsString).collect(Collectors.toList());
}
|