本文介绍了Kafka Connect topic.key.ignore无法正常工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

据我从kafka连接文档中了解,此配置应忽略metricbeat和filebeat主题的键,但不能忽略警报的键.但是kafka connect不会忽略任何密钥.

As I understand from the documentation of kafka connect this configuration should ignore the keys for metricbeat and filebeat topic but not for alarms. But kafka connect does not ignore any key.

这就是我在休息时推送到kafka-connect的完全json配置

So that's the fully json config that i pushing to kafka-connect over rest

{
 "auto.create.indices.at.start": false,
 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 "connection.url": "http://elasticsearch:9200",
 "connection.timeout.ms": 5000,
 "read.timeout.ms": 5000,
 "tasks.max": "5",
 "topics": "filebeat,metricbeat,alarms",
 "behavior.on.null.values": "delete",
 "behavior.on.malformed.documents": "warn",
 "flush.timeout.ms":60000,
 "max.retries":42,
 "retry.backoff.ms": 100,
 "max.in.flight.requests": 5,
 "max.buffered.records":20000,
 "batch.size":4096,
 "drop.invalid.message": true,
 "schema.ignore": true,
 "topic.key.ignore": "metricbeat,filebeat",
 "key.ignore": false
 "name": "elasticsearch-ecs-connector",
 "type.name": "_doc",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "transforms":"routeTS",
 "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
 "transforms.routeTS.topic.format":"${topic}-${timestamp}",
 "transforms.routeTS.timestamp.format":"YYYY.MM.dd",
 "errors.tolerance": "all" ,
 "errors.log.enable": false ,
 "errors.log.include.messages": false,
 "errors.deadletterqueue.topic.name":"logstream-dlq",
 "errors.deadletterqueue.context.headers.enable":true ,
 "errors.deadletterqueue.topic.replication.factor": 1
}

这是连接器启动期间的日志记录

That's the logging during start of the connector

[2020-05-01 21:07:49,960] INFO ElasticsearchSinkConnectorConfig values:
    auto.create.indices.at.start = false
    batch.size = 4096
    behavior.on.malformed.documents = warn
    behavior.on.null.values = delete
    compact.map.entries = true
    connection.compression = false
    connection.password = null
    connection.timeout.ms = 5000
    connection.url = [http://elasticsearch:9200]
    connection.username = null
    drop.invalid.message = true
    elastic.https.ssl.cipher.suites = null
    elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    elastic.https.ssl.endpoint.identification.algorithm = https
    elastic.https.ssl.key.password = null
    elastic.https.ssl.keymanager.algorithm = SunX509
    elastic.https.ssl.keystore.location = null
    elastic.https.ssl.keystore.password = null
    elastic.https.ssl.keystore.type = JKS
    elastic.https.ssl.protocol = TLS
    elastic.https.ssl.provider = null
    elastic.https.ssl.secure.random.implementation = null
    elastic.https.ssl.trustmanager.algorithm = PKIX
    elastic.https.ssl.truststore.location = null
    elastic.https.ssl.truststore.password = null
    elastic.https.ssl.truststore.type = JKS
    elastic.security.protocol = PLAINTEXT
    flush.timeout.ms = 60000
    key.ignore = false
    linger.ms = 1
    max.buffered.records = 20000
    max.in.flight.requests = 5
    max.retries = 42
    read.timeout.ms = 5000
    retry.backoff.ms = 100
    schema.ignore = true
    topic.index.map = []
    topic.key.ignore = [metricbeat, filebeat]
    topic.schema.ignore = []
    type.name = _doc
    write.method = insert

Iam使用Confluent Platform 5.5.0

Iam using Confluent Platform 5.5.0

推荐答案

在这里让我们回顾一下,因为对您的问题和问题陈述进行了多次)

Let's recap here, because there have been several edits to your question and problem statement :)

  1. 您想通过一个连接器将多个主题流式传输到Elasticsearch
  2. 您想将消息密钥用于某些主题作为Elasticsearch文档ID,而对于其他主题则不使用,而要使用Kafka消息坐标(topic + partition + offset)
  3. 您正在尝试使用 key.ignore topic.key.ignore 设置

这是我在以下三个主题中的测试数据: test01 test02 test03 :

Here's my test data in three topics, test01, test02, test03:

ksql> PRINT test01 from beginning;
Key format: KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:08:32.441 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:08:32.594 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}

ksql> PRINT test02 from beginning;
Key format: KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:08:50.865 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:08:50.936 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}

ksql> PRINT test03 from beginning;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:16:15.166 Z, key: <null>, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:16:46.404 Z, key: <null>, value: {"COL1": 2, "COL2": "BAR"}

使用此数据,我创建了一个连接器(我使用的是ksqlDB,但与直接使用REST API的方式相同):

With this data I create a connector (I'm using ksqlDB but it's the same as if you use the REST API directly):

CREATE SINK CONNECTOR SINK_ELASTIC_TEST WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'  = 'http://elasticsearch:9200',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'type.name'       = '_doc',
  'topics'          = 'test02,test01,test03',
  'key.ignore'      = 'false',
  'topic.key.ignore'= 'test02,test03',
  'schema.ignore'   = 'false'
);

在Elasticsearch中创建并填充结果索引.这是文档的索引和文档ID:

The resulting indices are created and populated in Elasticsearch. Here's the index and document ID of the documents:

➜ curl -s http://localhost:9200/test01/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test01","Y"]
["test01","X"]

➜ curl -s http://localhost:9200/test02/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test02","test02+0+0"]
["test02","test02+0+1"]

➜ curl -s http://localhost:9200/test03/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test03","test03+0+0"]
["test03","test03+0+1"]

因此 key.ignore 是默认设置,而 test01 有效,这意味着消息的密钥用于文档ID.

So key.ignore is the default and for test01 in effect, which means that the key of the messages are used for the document ID.

主题 test02 test03 ,这意味着消息的密钥是被忽略(即实际上为 key.ignore = true ),因此文档ID是消息的主题/分区/偏移量.

Topics test02 and test03 are listed for topic.key.ignore which means that the key of the message is ignored (i.e. in effect key.ignore=true), and thus the document ID is the topic/partition/offset of the message.

鉴于我已经证明了确实可以正常工作,因此我建议您从头开始重新测试以再次检查您的工作.

I would recommend, given that I've proven out above that this does work, that you start your test again from scratch to double-check your working.

这篇关于Kafka Connect topic.key.ignore无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-11 15:40