ES & Filebeat 使用 Pipeline 处理日志中的 @timestamp

  • A+
所属分类:linux技术
摘要

Filebeat 收集的日志发送到 ElasticSearch 后,会默认添加一个 @timestamp 字段作为时间戳用于检索,而日志中的信息会全部添加到 message 字段中,但是这个时间是 Filebeat 采集日志的时间,不是日志生成的实际时间,所以为了便于检索日志,需要将 @timestamp 替换为 message 字段中的时间。


使用 Pipeline 处理日志中的 @timestamp

Filebeat 收集的日志发送到 ElasticSearch 后,会默认添加一个 @timestamp 字段作为时间戳用于检索,而日志中的信息会全部添加到 message 字段中,但是这个时间是 Filebeat 采集日志的时间,不是日志生成的实际时间,所以为了便于检索日志,需要将 @timestamp 替换为 message 字段中的时间。

这里使用的是 elasticseatch 提供的 pipeline 来进行替换。首先日志格式如下:

20-09-22 07:01:25.109 INFO - {"traceId":"65e97e88a61d7cd4558b8f3a203458fd"} 20-09-22 06:51:12.117 INFO - {"traceId":"4e0542c994919065f71536872ccb9677"} 

在 Kibana 中的 Devtools 界面中编写如下 pipeline 并执行:

PUT _ingest/pipeline/test-news-server-online      # test-news-server-online 为流水线的名称 {   "description": "test-news-server-online",            # 对 pipeline 进行描述   "processors": [     {       "grok": {                                                              # 使用 grok 对日志内容进行提取         "field": "message",					       # 选择要提取信息的字段         "patterns": [           "%{TIMESTAMP_ISO8601:logatime}"          # 使用 TIMESTAMP_ISO8601 的标准匹配时间,将匹配的值赋值给新增的字段 logatime         ],         "ignore_failure": true		                       # 如果日志中有不存在时间戳的行,可以添加这个配置来忽略匹配错误产生的 error 信息       },       "date": {					                       # 使用 data 时间戳插件来格式化时间输出,替代默认的 @timestamp         "field": "logatime",				               # 指定使用新增的 logatime 字段         "timezone": "Asia/Shanghai", 	                       # 指定输出时间的时区,不指定的话可能会比正确的时间晚 8 个小时         "formats": [           "yy-MM-dd HH:mm:ss.SSS"	              # 指定时间输出的格式         ],         "ignore_failure": true		                      # 如果遇到错误则忽略       }     }   ] } 

pipeline 编写完成后,在 Devtools 中可以使用如下命令进行查询:

GET _ingest/pipeline/test-news-server-online 

在 filebeat 中引用这个 pipeline:

filebeat.idle_timeout: 2s filebeat.inputs: - backoff: 1s   backoff_factor: 2   close_inactive: 1h   enabled: true   encoding: plain   harvester_buffer_size: 262144   max_backoff: 10s   max_bytes: 10485760   paths:   - /opt/trace.log   scan_frequency: 10s   tail_lines: true   type: log   fields:     type: test-news-server filebeat.name: filebeat-shiper filebeat.spool_zie: 50000 output.elasticsearch:   bulk_max_size: 8192   hosts:   - 10.11.16.211:30187   - 10.11.16.212:30187   - 10.11.16.213:30187   - 10.11.16.214:30187   - 10.11.16.215:30187   index: test-news-timestamp   workers: 4   pipeline: "test-news-server-online"				# 在此处指定 pipeline 的名称 processors: - drop_fields:     fields:     - agent.ephemeral_id     - agent.hostname     - agent.id     - agent.type     - agent.version     - ecs.version     - input.type     - log.offset     - version - decode_json_fields:     fields:     - message     max_depth: 1     overwrite_keys: true setup.ilm.enabled: false setup.template.name: test-news-timestamp-reverse setup.template.pattern: test-news-timestamp-reverse-* 

运行 filebeat,在 kibana 中查看日志信息,可以看到收集的日志信息中新增了 logatime 字段,@timestamp 字段的时间也与 logatime 字段保持了一致。

如果在 filebeat 运行的日志中发现了如下报错信息,有可能是日志中存在不含有时间戳的行(一般是由于日志被截断导致的,可以参考处理多行日志的文档):

ERROR   pipeline/output.go:121  Failed to publish events: temporary bulk send failure 

如果不希望将 logatime 字段在日志中展示的话,可以将 pipeline 修改为如下内容:

PUT _ingest/pipeline/test-news-server-online {   "description": "test-news-server-online",   "processors": [     {       "grok": {         "field": "message",         "patterns": [           "%{TIMESTAMP_ISO8601:logatime}"         ],         "ignore_failure": true       },       "date": {         "field": "logatime",         "timezone": "Asia/Shanghai",          "formats": [           "yy-MM-dd HH:mm:ss"         ],         "ignore_failure": true       },       "remove": {         "field": "logatime"       }     }   ] } 

如果希望将 logatime 的值同时赋值给其他的新增字段,例如 realtime ,pipeline 修改如下:

PUT _ingest/pipeline/test-news-server-online {   "description": "test-news-server-online",   "processors": [     {       "grok": {         "field": "message",         "patterns": [           "%{TIMESTAMP_ISO8601:logatime}"         ],         "ignore_failure": true       },       "date": {         "field": "logatime",         "timezone": "Asia/Shanghai",          "target_field": "realtime"         "formats": [           "yy-MM-dd HH:mm:ss"         ],         "ignore_failure": true       },       "remove": {         "field": "logatime"       }     }   ] } 

target_field 字段用于将一个值赋值给指定的字段,默认是给 @timestamp ,如果未提供该选项,则会默认更新 @timestamp 字段