Elastic从源端到分析端的数据过滤方案
发布时间:2022-03-16 14:43:09

一、 引言

本文章将介绍elasticsearch从采集数据到分析数据,在不同阶段进行字段的过滤的详尽方法。

二、背景

由于收集的日志信息经过解析后字段非常多,很多都不是目标字段,客户需要我们将字段进行过滤操作。

从收集的流程来看一般的日志数据处理架构是由beats进行数据处理,由kafka进行数据缓存,流入logstash进行数据的解析,推送至elasticsearch进行数据的分析存储,最后由kibana进行分析展示,因此可以从源端、管道端、分析端,这三个阶段考虑完成这个目标需求。

技术架构:

Elastic从源端到分析端的数据过滤方案(图1)

三、 解决方案

1、源端

Filebeat是elastic stack中一个日志文件托运工具,在服务器上安装客户端后,Filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读),并且转发这些信息到ElasticSearch或者Logstash中存放。

当开启Filebeat程序的时候,它会启动一个或多个探测器(prospectors)去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,Filebeat启动收割进程(harvester),每一个收割进程读取一个日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到指定的地点。Filebeat其中包含了processors模块可以进行数据的初步处理,例如过滤数据等。

Elastic从源端到分析端的数据过滤方案(图2)

可以在filebeat.yml文件中找到这个模块进行自定义的字段操作

Elastic从源端到分析端的数据过滤方案(图3)

采用drop_fields方法清除非目标字段,当满足条件时,将非目标字段进行弃用。例如想去掉非目标字段http.response.code,使用has_fields方法进行判断,存在目标字段时将其弃用。

processors:

  - drop_fields:

      when:

        has_fields: ['http.response.code']

      fields: [http.response.code”]

      ignore_missing: false   #如果指定字段不存在时,不会返回错误

 

2、管道端

Logstash 是elastic stack开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到存储库中。数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash是基于pipeline方式进行数据处理的,pipeline可以理解为数据处理流程的抽象。在一条pipeline数据经过上游数据源汇总到消息队列中,然后由多个工作线程进行数据的转换处理,最后输出到下游组件。一个logstash中可以包含多个pipeline。

Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器:

Input:数据输入组件,用于对接各种数据源,接入数据,支持解码器,允许对数据进行编码解码操作;必选组件;

output:数据输出组件,用于对接下游组件,发送处理后的数据,支持解码器,允许对数据进行编码解码操作;必选组件;

filter:数据过滤组件,负责对输入数据进行加工处理;

Elastic从源端到分析端的数据过滤方案(图4)

针对过滤数据字段的需求,我们可以采用drop filter插件进行数据的处理,使用filebeat从源端采集数据后进入kafka集群进行队列缓存,数据流入logstash进行过滤,使用drop remove_field可以 将非目标字段进行移除。

  filter {
      drop {
        remove_field => [ "http.response.code" ]
      }
    }


2、分析端

Elastic 增加了Ingest node功能,Ingest node可以自定义pipeline,用户可在pipeline中自定processor,对数据进行过滤转换提取等操作,功能很强大,为elastic提供了一种文档预处理和增强转换的轻量级方案。

数据流入elastisearch,我们将采用Elasticsearch ingest pipeline移除已经存在的字段lasticsearch ingest pipeline用于预处理数据,pipeline是一系列处理管道,一系列的processors,先来看下pipeline的处理过程:

Elastic从源端到分析端的数据过滤方案(图5)

常用pipeline如下:

1、Trim:去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理;

2、Split:切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型;

3、Rename:重命名字段;

4、Foreach:对一组数据进行相同的预处理,可以使用Foreach;

5、Lowercase/Uppercase:对字段进行大小写转换;

6、Remove:移除字段;

7、Set:设置字段值;


本次需求是去除非目标字段,因此选用remove方法进行操作:

#定义remove pipeline

PUT _ingest/pipeline/remove_pipeline

{

  "description": "remove processor",

  "processors": [

    {

      "remove": {

        "field": "http.response.code"

      }

    }

  ]

}

 

#测试

POST _ingest/pipeline/remove_pipeline/_simulate

{

  "docs": [

    {

      "_source": {

        " http.response.code ": [

          "200",

          "400"

        ]

      }

    }

  ]

}

 

#返回,可以看到http.response.code字段已经被移除

{

  "docs" : [

    {

      "doc" : {

        "_index" : "_index",

        "_type" : "_doc",

        "_id" : "_id",

        "_source" : { }

        }

      }

    }

  ]

}

四、  综合对比


filebeat

logstash

elasticsearch

内存

功能

传输

从多种输入端采集并实时解析和转换数据并输出到多种输出端

分析、处理、存储数据

轻重

轻量级二进制文件

相对较重

较重

过滤能力

有过滤能力但是较弱

强大的过滤能力

无过滤能力,但是可以将已经导入的数据进行处理

进程

十分稳定

一台服务器只允许一个logstash进程,挂掉之后需要手动拉起

十分稳定

集群

单节点

单节点

分布式


Filebeat:

设计目的写Logstash和ES,轻量型日志采集工具,与logstash协作,更稳健少宕机(数十MB级别)

Logstash:

使用简单较好上手,插件丰富,扩展功能更强,注重数据预处理(GB级别)

Elasticsearch:

上手难度大于前两个,注重于数据导入后的数据处理(GB级别)

咨询电话
400-068-1180