Elastic 中国社区官方博客

Elastic 中国社区官方博客

作者:David Pilato

在 Elasticsearch 中丰富你的 Elasticsearch 文档-LMLPHP

对于 Elasticsearch®,我们知道联接应该在 “索引时” 而不是查询时完成。 本博文是一系列三篇博文的开始,因为我们可以在 Elastic® 生态系统中采取多种方法。 我们将介绍如何在 Elasticsearch 中做到这一点。 下一篇博文将介绍如何使用集中式组件 Logstash 来实现这一点,上一篇博文将展示如何使用 Elastic Agent/Beats 在边缘实现这一点。

举一个简单的例子,假设我们是一个电子商务网站,在 kibana_sample_data_logs 中收集日志:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  }
}

请注意,你可以通过单击 “Sample web blogs” 框中的 “Add data”按钮,使用 Kibana® 示例数据集轻松导入此数据集:

我们还有一个 VIP 索引,其中包含有关我们客户的信息:

{ 
  "ip" : "30.156.16.164", 
  "vip": true, 
  "name": "David P" 
}

要导入此示例数据集,我们只需运行:

DELETE /vip
PUT /vip
{
  "mappings": {
    "properties": {
      "ip": { "type": "keyword" },
      "name": { "type": "text" },
      "vip": { "type": "boolean" }
    }
  }
}
POST /vip/_bulk
{ "index" : { } }
{ "ip" : "30.156.16.164", "vip": true, "name": "David P" }
{ "index" : { } }
{ "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" }
{ "index" : { } }
{ "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" }
{ "index" : { } }
{ "ip" : "236.212.255.77", "vip": true, "name": "Carly R" }
{ "index" : { } }
{ "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" }
{ "index" : { } }
{ "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" }
{ "index" : { } }
{ "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" }
{ "index" : { } }
{ "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }

要执行 “joins at index time”,我们需要丰富我们的数据集以获得如下所示的最终日志:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  },
  "vip": true, 
  "name": "David P" 
}

你可以使用摄取管道中的 Elasticsearch Enrich Processor 开箱即用地执行此操作。 让我们看看如何做到这一点。

在 Elasticsearch 中丰富 Elasticsearch 数据

摄取管道 - ingest pipeline

让我们首先使用摄取管道

我们可以从一个空的开始,我们将用它来模拟我们想要的行为。 我们不需要原始数据集的完整字段集,因此我们对其进行了简化:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": []
  }
}

我们现在需要向我们的管道添加一个 enrich processor。 但为此,我们需要首先创建一个丰富的策略 (enrich policy):

PUT /_enrich/policy/vip-policy
{
  "match": {
    "indices": "vip",
    "match_field": "ip",
    "enrich_fields": ["name", "vip"]
  }
}

创建丰富策略后,我们可以使用执行丰富策略 API 来执行它:

PUT /_enrich/policy/vip-policy/_execute

我们现在可以模拟它:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": [{
      "enrich": {
        "policy_name": "vip-policy",
        "field": "clientip",
        "target_field": "enriched"
      }
    }]
  }
}

这给出如下的响应:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "enriched": {
            "name": "David P",
            "vip": true,
            "ip": "30.156.16.164"
          },
          "clientip": "30.156.16.164"
        },
        "_ingest": {
          "timestamp": "2023-04-06T17:14:29.127569953Z"
        }
      }
    }
  ]
}

我们只需清理一下数据即可获得我们期望的结构:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": [{
      "enrich": {
        "policy_name": "vip-policy",
        "field": "clientip",
        "target_field": "enriched"
      }
    },{
      "rename": {
        "field": "enriched.name",
        "target_field": "name"
      }
    },{
      "rename": {
        "field": "enriched.vip",
        "target_field": "vip"
      }
    },{
      "remove": {
        "field": "enriched"
      }
    }
    ]
  }
}

现在给出了预期的结果:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "name": "David P",
          "vip": true,
          "clientip": "30.156.16.164"
        },
        "_ingest": {
          "timestamp": "2023-04-06T17:16:08.175186282Z"
        }
      }
    }
  ]
}

我们现在可以存储最终的管道:

PUT /_ingest/pipeline/vip
{
  "processors": [{
    "enrich": {
      "policy_name": "vip-policy",
      "field": "clientip",
      "target_field": "enriched"
    }
  },{
    "rename": {
      "field": "enriched.name",
      "target_field": "name",
      "ignore_failure": true
    }
  },{
    "rename": {
      "field": "enriched.vip",
      "target_field": "vip",
      "ignore_failure": true
    }
  },{
    "remove": {
      "field": "enriched",
      "ignore_failure": true
    }
  }
  ]
}

请注意,我们通过添加一些 ignore_failure 指令对其进行了一些更改,因为我们可能在 vip 索引中找不到任何相关数据。

我们可以使用与源索引相同的映射来创建目标索引:

# Get the source mapping
GET /kibana_sample_data_logs/_mapping

# Create the destination index
PUT /kibana_sample_data_logs_new
{
  // Paste the source mappings structure
  "mappings": {
    "properties": {
      // And add the properties we are adding
      "name": {
        "type": "keyword"
      },
      "vip": {
        "type": "boolean"
      }
    }
  }
}

并调用重建索引 API:

POST _reindex
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest": {
    "index": "kibana_sample_data_logs_new",
    "pipeline": "vip"
  }
}

让我们检查一下工作是否已完成:

GET /kibana_sample_data_logs_new/_search?filter_path=aggregations.by_name.buckets
{
  "size": 0,
  "aggs": {
    "by_name": {
      "terms": {
        "field": "name"
      }
    }
  }
}

上述命令给出如下类似的响应:

{
  "aggregations": {
    "by_name": {
      "buckets": [
        {
          "key": "David P",
          "doc_count": 100
        },
        {
          "key": "Philipp K",
          "doc_count": 29
        },
        {
          "key": "Adrienne V",
          "doc_count": 26
        },
        {
          "key": "Carly R",
          "doc_count": 26
        },
        {
          "key": "Iulia F",
          "doc_count": 25
        },
        {
          "key": "Naoise R",
          "doc_count": 25
        },
        {
          "key": "Jelena Z",
          "doc_count": 24
        },
        {
          "key": "Matt R",
          "doc_count": 24
        }
      ]
    }
  }
}

运行时字段丰富

丰富数据的另一种方法是在搜索时而不是索引时执行此操作。 这与本文的第一句话相悖,但有时,你需要进行一些权衡。 在这里,我们想用搜索速度来交换灵活性。

运行时字段功能 (runtime field feature) 允许丰富搜索响应对象,但不能用于查询或聚合数据。 此功能的一个简单示例:

GET kibana_sample_data_logs/_search?filter_path=hits.hits.fields
{
  "size": 1,
  "query": {
    "match": {
      "clientip": "30.156.16.164"
    }
  }, 
  "runtime_mappings": {
    "enriched": {
        "type": "lookup", 
        "target_index": "vip", 
        "input_field": "clientip", 
        "target_field": "ip", 
        "fetch_fields": ["name", "vip"] 
    }
  },
  "fields": [
    "clientip",
    "enriched"
  ],
  "_source": false
}

上述命令给出如下的响应:

{
  "hits": {
    "hits": [
      {
        "fields": {
          "enriched": [
            {
              "name": [
                "David P"
              ],
              "vip": [
                true
              ]
            }
          ],
          "clientip": [
            "30.156.16.164"
          ]
        }
      }
    ]
  }
}

请注意,这也可以添加为映射的一部分:

PUT kibana_sample_data_logs/_mappings
{
  "runtime": {
    "enriched": {
      "type": "lookup", 
      "target_index": "vip", 
      "input_field": "clientip", 
      "target_field": "ip", 
      "fetch_fields": ["name", "vip"] 
    }
  }
}

GET kibana_sample_data_logs/_search
{
  "size": 1,
  "query": {
    "match": {
      "clientip": "30.156.16.164"
    }
  }, 
  "fields": [
    "clientip",
    "enriched"
  ]
}

但是,如果你希望能够搜索或聚合这些字段,则需要在搜索时实际发出 (emit) 一些内容。

请注意,我们不能使用此方法在另一个索引中进行查找。 因此,因为且仅仅因为列表的长度很小,我们可以使用脚本来动态进行 “丰富”:

PUT kibana_sample_data_logs/_mappings
{
  "runtime": {
    "name": {
      "type": "keyword",
      "script": {
        "source": 
        """
        def name=params.name;
        for (int i=0; i< params.lookup.length; i++) {
          if (params.lookup[i].ip == doc['clientip'].value) {
            emit(params.lookup[i].name);
            break;
          }
        }
        """,
        "lang": "painless",
        "params": {
          "name": "David P",
          "lookup": [
            { "ip" : "30.156.16.164", "vip": true, "name": "David P" },
            { "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
            { "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
            { "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
            { "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
            { "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
            { "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
            { "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
          ]
        }
      }
    },
    "vip": {
      "type": "boolean",
      "script": {
        "source": 
        """
        def name=params.name;
        for (int i=0; i< params.lookup.length; i++) {
          if (params.lookup[i].ip == doc['clientip'].value) {
            emit(params.lookup[i].vip);
            break;
          }
        }
        """,
        "lang": "painless",
        "params": {
          "name": "David P",
          "lookup": [
            { "ip" : "30.156.16.164", "vip": true, "name": "David P" },
            { "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
            { "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
            { "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
            { "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
            { "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
            { "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
            { "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
          ]
        }
      }
    }
  }
}

我们可以再次聚合这些运行时字段:

GET /kibana_sample_data_logs/_search?filter_path=aggregations.by_name.buckets
{
  "size": 0,
  "aggs": {
    "by_name": {
      "terms": {
        "field": "name"
      }
    }
  }
}

这给出了与我们之前看到的相同的结果,但当然有点慢:

{
  "aggregations": {
    "by_name": {
      "buckets": [
        {
          "key": "David P",
          "doc_count": 100
        },
        {
          "key": "Philipp K",
          "doc_count": 29
        },
        {
          "key": "Adrienne V",
          "doc_count": 26
        },
        {
          "key": "Carly R",
          "doc_count": 26
        },
        {
          "key": "Iulia F",
          "doc_count": 25
        },
        {
          "key": "Naoise R",
          "doc_count": 25
        },
        {
          "key": "Jelena Z",
          "doc_count": 24
        },
        {
          "key": "Matt R",
          "doc_count": 24
        }
      ]
    }
  }
}

同样,此方法不适用于大索引,因此如我们在第一部分中看到的那样重新索引数据将是首选方法。

10-28 11:44