一、修改配置
1、未设置前
2、通过配置删除service,host,event,metricset字段
processors:
- drop_fields:
fields: ["host","event","metricset","service"]
3、设置后
?参考文档:Define processors | Filebeat Reference [8.1] | Elastic
二、通过源码修改
涉及文件:
// libbeat/outputs/codec/json/event.go
package json
import (
//"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)
// Event describes the event structure for events
// (in-)directly send to logstash
type event struct {
// 修改@timestamp字段
//Timestamp time.Time `struct:"@timestamp"`
Timestamp string `struct:"TIME"`
// 修改@metadata字段
//Meta meta `struct:"@metadata"`
Fields common.MapStr `struct:",inline"`
}
// Meta defines common event metadata to be stored in '@metadata'
type meta struct {
Beat string `struct:"beat"`
Type string `struct:"type"`
Version string `struct:"version"`
Fields map[string]interface{} `struct:",inline"`
}
func makeEvent(index, version string, in *beat.Event) event {
return event{
Timestamp: in.Timestamp.Local().Format("2006-01-02 15:04:05"),
/*
Meta: meta{
Beat: index,
Version: version,
Type: "_doc",
Fields: in.Meta,
},
*/
Fields: in.Fields,
}
}
// 以metricbeat举例
// metricbeat/mb/event.go
package mb
import (
"fmt"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)
// EventModifier is a function that can modifies an Event. This is typically
// used to apply transformations to an Event as it is converted to a
// beat.Event. An example is AddMetricSetInfo.
type EventModifier func(module, metricset string, event *Event)
// Event contains the data generated by a MetricSet.
type Event struct {
RootFields common.MapStr // Fields that will be added to the root of the event.
ModuleFields common.MapStr // Fields that will be namespaced under [module].
MetricSetFields common.MapStr // Fields that will be namespaced under [module].[metricset].
Index string // Index name prefix. If set overwrites the default prefix.
ID string // ID of event. If set, overwrites the default ID.
Namespace string // Fully qualified namespace to use for MetricSetFields.
Timestamp time.Time // Timestamp when the event data was collected.
Error error // Error that occurred while collecting the event data.
Host string // Host from which the data was collected.
Service string // Service type
Took time.Duration // Amount of time it took to collect the event data.
Period time.Duration // Period that is set to retrieve the events
DisableTimeSeries bool // true if the event doesn't contain timeseries data
}
// BeatEvent returns a new beat.Event containing the data this Event. It does
// mutate the underlying data in the Event.
func (e *Event) BeatEvent(module, metricSet string, modifiers ...EventModifier) beat.Event {
if e.RootFields == nil {
e.RootFields = common.MapStr{}
}
for _, modify := range modifiers {
modify(module, metricSet, e)
}
b := beat.Event{
Timestamp: e.Timestamp,
Fields: e.RootFields,
TimeSeries: !e.DisableTimeSeries,
}
if len(e.ModuleFields) > 0 {
b.Fields.Put(module, e.ModuleFields)
e.ModuleFields = nil
}
// If service is not set, falls back to the module name
/*
if e.Service == "" {
e.Service = module
}
e.RootFields.Put("service.type", e.Service)
*/
if len(e.MetricSetFields) > 0 {
switch e.Namespace {
case ".":
// Add fields to root.
b.Fields.DeepUpdate(e.MetricSetFields)
case "":
b.Fields.Put(module+"."+metricSet, e.MetricSetFields)
default:
b.Fields.Put(e.Namespace, e.MetricSetFields)
}
e.MetricSetFields = nil
}
// Set index prefix to overwrite default
if e.Index != "" {
b.Meta = common.MapStr{"index": e.Index}
}
if e.ID != "" {
b.SetID(e.ID)
}
if e.Error != nil {
b.Fields["error"] = common.MapStr{
"message": e.Error.Error(),
}
}
return b
}
// AddMetricSetInfo is an EventModifier that adds information about the
// MetricSet that generated the event. It will always add the metricset and
// module names. And it will add the host, period (in milliseconds), and
// duration (round-trip time in nanoseconds) values if they are non-zero
// values.
//
// {
// "event": {
// "dataset": "apache.status",
// "duration": 115,
// "module": "apache"
// },
// "service": {
// "address": "127.0.0.1",
// },
// "metricset": {
// "name": "status",
// "period": 10000
// }
// }
//
func AddMetricSetInfo(module, metricset string, event *Event) {
if event.Namespace == "" {
event.Namespace = fmt.Sprintf("%s.%s", module, metricset)
}
e := common.MapStr{
"event": common.MapStr{
"dataset": event.Namespace,
"module": module,
},
// TODO: This should only be sent if migration layer is enabled
"metricset": common.MapStr{
"name": metricset,
},
}
if event.Host != "" {
e.Put("service.address", event.Host)
}
if event.Took > 0 {
e.Put("event.duration", event.Took/time.Nanosecond)
}
if event.Period > 0 {
e.Put("metricset.period", event.Period/time.Millisecond)
}
if event.RootFields == nil {
event.RootFields = e
} else {
event.RootFields.DeepUpdate(e)
}
}
// TransformMapStrToEvent transforms a common.MapStr produced by MetricSet
// (like any MetricSet that does not natively produce a mb.Event). It accounts
// for the special key names and routes the data stored under those keys to the
// correct location in the event.
func TransformMapStrToEvent(module string, m common.MapStr, err error) Event {
var (
event = Event{RootFields: common.MapStr{}, Error: err}
)
for k, v := range m {
switch k {
case TimestampKey:
switch ts := v.(type) {
case time.Time:
delete(m, TimestampKey)
event.Timestamp = ts
case common.Time:
delete(m, TimestampKey)
event.Timestamp = time.Time(ts)
}
case ModuleDataKey:
delete(m, ModuleDataKey)
event.ModuleFields, _ = tryToMapStr(v)
case RTTKey:
delete(m, RTTKey)
if took, ok := v.(time.Duration); ok {
event.Took = took
}
case NamespaceKey:
delete(m, NamespaceKey)
if ns, ok := v.(string); ok {
// The _namespace value does not include the module name and
// it is required in the mb.Event.Namespace value.
event.Namespace = module + "." + ns
}
}
}
event.MetricSetFields = m
return event
}
func tryToMapStr(v interface{}) (common.MapStr, bool) {
switch m := v.(type) {
case common.MapStr:
return m, true
case map[string]interface{}:
return common.MapStr(m), true
default:
return nil, false
}
}
|