• Elasticsearch: Pipeline


    序言

    Elasticsearch有采集管道直说.其实我们在Kibana中就可以看到它已经提供了2个.所有的文档(Document)都是先通过管道在入库的cuiyaonan2000@163.com

    默认提供的管道如下所示:

    管道的定义如下所示

     

     Ingest Node

    Ingest Node表示:预处理节点,是 ES 用于功能上命名的一种节点类型,可以通过 elasticsearch.xml 进行如下配置来标识出集群中的某个节点是否是 Ingest Node.

    node.ingest: ture

    上述将 node.ingest 设置成 true,则表明当前节点是 Ingest Node,具有预处理能力, Elasticsearch 默认所有节点都是 Ingest Node,即集群中所有的节点都具有预处理能力.

    预处理

    用过 Logstash 对日志进行处理的用户都知道,一般情况下我们并不会直接将原始的日志进行加载到 Elasticsearch 集群,而是对原始日志信息进行(深)加工后保存到 Elasticsearch 集群中。比如 Logstash 支持多种解析器比如 json,kv,date 等,比较经典的是 grok.这里我们不会对 Logstash 的解析器进行详细说明,只是为了描述一个问题,有些时候我们需要 Logstash 来对加载到 Elasticsearch 中的数据进行处理,这个处理,从概念上而言,我们也能称之为"预处理。而这里我们所说的预处理也其实就是类似的概念。

    Elasticsearch 5.x 版本开始,官方在内部集成了部分 Logstash 的功能,这就是 Ingest,而具有 Ingest 能力的节点称之为 Ingest Node.

    如果要脱离 Logstash 来对在 Elasticsearch 写入文档之前对文档进行加工处理,比如为文档某个字段设置默认值,重命名某个字段,设置通过脚本来完成更加复杂的加工逻辑,我们则必须要了解两个基本概念: Pipeline 和 Processors.

    Pipeline与Processor

    这里就以Pipeline和java中的Stream进行类比,两者从功能和概念上很类似,我们经常会对Stream中的数据进行处理,比如map操作,peek操作,reduce操作,count操作等,这些操作从行为上说,就是对数据的加工,而Pipeline也是如此,Pipeline也会对通过该Pipeline的数据(一般来说是文档)进行加工,比如上面说到的,修改文档的某个字段值,修改文档某个字段的类型等等.而Elasticsearch对该加工行为进行抽象包装,并称之为Processors.

    Elasticsearch命名了多种类型的Processors来规范对文档的操作,比如set,append,date,join,json,kv等等.这些不同类型的Processors,我们会在后文进行说明

    如此这般,Pipeline相当于java中的stream.而process相当于是一些类似于map,flatmap的算子cuiyaonan2000@163.com

    定义一个 Pipeline 是件很简单的事情,官方给出了参考(pipeline中包含了processors):

    1. PUT _ingest/pipeline/my-pipeline-id
    2. {
    3. "description" : "describe pipeline",
    4. "processors" : [
    5. {
    6. "set" : {
    7. "field": "foo",
    8. "value": "bar"
    9. }
    10. }
    11. ]
    12. }

    上面的例子,表明通过指定的 URL 请求"_ingest/pipeline“定义了一个 ID 为”my-pipeline-id"的 pipeline,其中请求体中的存在两个必须要的元素:

    • description 描述该 pipeline 是做什么的
    • processors 定义了一系列的 processors,这里只是简单的定义了一个赋值操作,即将字段名为"foo“的字段值都设置为”bar"


    如果需要了解更多关于 Pipeline 定义的信息,可以参考: Ingest APIs

    Simulate Pipeline API

    该API即用于我们测试自己的Pipeline的工具,可以帮助我们去测试自己的管道.

    1. #如下测试pipeline的时候不指定pipeline,而是在测试内容中声明一个
    2. POST _ingest/pipeline/_simulate
    3. {
    4. "pipeline" : {
    5. // pipeline definition here
    6. //因为没有在路径中设置pipeline的id,所以这里我们要声明一个
    7. },
    8. "docs" : [
    9. { "_source": {/** first document **/} },
    10. { "_source": {/** second document **/} },
    11. // ...
    12. ]
    13. }
    14. #这里直接在路径中指明一个pipeline来测试
    15. POST _ingest/pipeline/my-pipeline-id/_simulate
    16. {
    17. "docs" : [
    18. { "_source": {/** first document **/} },
    19. { "_source": {/** second document **/} },
    20. // ...
    21. ]
    22. }

    simulate的瞬态字段

    执行如下的测试访问

    1. POST _ingest/pipeline/_simulate
    2. {
    3. "pipeline" :
    4. {
    5. "description": "_description",
    6. "processors": [
    7. {
    8. "set" : {
    9. "field" : "field2",
    10. "value" : "_value"
    11. }
    12. }
    13. ]
    14. },
    15. "docs": [
    16. {
    17. "_index": "index",
    18. "_type": "type",
    19. "_id": "id",
    20. "_source": {
    21. "foo": "bar"
    22. }
    23. },
    24. {
    25. "_index": "index",
    26. "_type": "type",
    27. "_id": "id",
    28. "_source": {
    29. "foo": "rab"
    30. }
    31. }
    32. ]
    33. }

    返回的内容

    1. {
    2. "docs": [
    3. {
    4. "doc": {
    5. "_id": "id",
    6. "_index": "index",
    7. "_type": "type",
    8. "_source": {
    9. "field2": "_value",
    10. "foo": "bar"
    11. },
    12. "_ingest": {
    13. "timestamp": "2017-05-04T22:30:03.187Z"
    14. }
    15. }
    16. },
    17. {
    18. "doc": {
    19. "_id": "id",
    20. "_index": "index",
    21. "_type": "type",
    22. "_source": {
    23. "field2": "_value",
    24. "foo": "rab"
    25. },
    26. "_ingest": {
    27. "timestamp": "2017-05-04T22:30:03.188Z"
    28. }
    29. }
    30. }
    31. ]
    32. }

    从具体的响应结果中看到,在文档通过 pipeline 时(或理解为被 pipeline 中的 processors 加工后),新的文档与原有的文档产生了差异,这些差异体现为:

    • 文档都新增了 field2 字段,这点我们可以通过对比响应前后的定义在"docs"中文档的_source 内容中看出
    • 额外增加了一些临时(官方称之为瞬态)的字段,比如 timestamp,他们都在_ingest 节点下,(这些)字段都是临时与源文档存在一起,在被 pipeline 中的 processors 加工后后返回给对应的批量操作或索引操作之后。这些信息就不会携带返回。----额外增加的字段都在_ingest节点下cuiyaonan2000@163.com

    Processor中访问文档元数据

    每个文档都会有一些元数据字段信息(metadata filed),比如_id_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

    1. {
    2. "set": {
    3. "field": "_id"
    4. "value": "1"
    5. }
    6. }

    Processor访问瞬态字段

    只是引用方式的区别,瞬态字段需要使用{{}}来访问,没有文档元数据的字段访问方便cuiyaonan2000@163.com

    1. {
    2. "set": {
    3. "field": "received"
    4. "value": "{{_ingest.timestamp}}"
    5. }
    6. }

    Processors 类型

    系统提供的默认的类型如下所示.

    • Append Processor 追加处理器
    • Convert Processor 转换处理器
    • Date Processor 日期转换器
    • Date Index Name Processor 日期索引名称处理器
    • Fail Processor 失败处理器
    • Foreach Processor 循环处理器
    • Grok Processor Grok 处理器
    • Gsub Processor
    • Join Processor
    • JSON Processor
    • KV Processor
    • Lowercase Processor
    • Remove Processor
    • Rename Processor
    • Script Processor
    • Split Processor
    • Sort Processor
    • Trim Processor
    • Uppercase Processor
    • Dot Expander Processor

    实例参考文档Elasticsearch Pipeline 详解_自知自省的博客-CSDN博客_es的pipeline

  • 相关阅读:
    重温OKHTTP源码
    2023天津科技大学计算机考研信息汇总
    TypeScript基础入门
    矩阵的模和内积
    SQL Server教程 - T-SQL-事务(TRANSACTION)
    java-php-net-python-在线音乐播放网站.计算机毕业设计程序
    SpringBoot拉取高德天气预报数据
    新基建助力智能化道路交通领域的转型发展
    CJO 系列丨创造价值:CJO 如何解决核心业务挑战
    常用的sql函数(语法)
  • 原文地址:https://blog.csdn.net/cuiyaonan2000/article/details/126286543