说下背景,使用debezium同步数据到kafka,然后使用logstash同步kafka数据到es,想要实现pgsql删除数据后,把es中的数据删除掉 通过观察发现,数据库新增的数据,kafka中的after是有数据的,对于删除数据,kafka中的after是没有数据的,因此可以通过判断这个值来做处理
input {
kafka {
bootstrap_servers => ["localhost:9092"]
group_id => "voucher_voucher1"
topics => ["citus8.user1.voucher"]
client_id => "voucher1"
auto_offset_reset => "earlist"
consumer_threads => 1
decorate_events => "true"
codec => "json"
}
}
filter {
ruby {
code => "if event.get('after').nil?; event.set('tags','null-value');end"
}
#主要是获取主键
if "null-value" in [tags] {
mutate {
add_field => {
"@after" => "%{before}"
}
}
}
else{
mutate {
add_field => {
"@after" => "%{after}"
}
}
}
json {
source=>"@after"
}
ruby {
code => "
require 'json'
some_json_field_value = JSON.parse(event.get('voucher_source').to_s)
event.set('voucher_source',some_json_field_value)
"
}
mutate {
remove_field => ["kafka","source","before","after","@after","@timestamp","@version","ts_ms"]
}
}
output {
stdout { codec => json_lines }
if "null-value" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
action=>"delete" #删除
index => "e_document"
document_id => "%{voucher_id}"
}
}
else{
elasticsearch {
hosts => ["localhost:9200"]
index => "e_document"
document_id => "%{voucher_id}"
}
}
}
https://elasticsearch.cn/question/3681
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-action
|