我创建了最简单的 kafka sink 连接器配置,我使用的是 confluent 4.1.0:

{
  "connector.class":
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

在主题中,我将消息保存在 JSON
{ "topics": "resd"}

但在结果中我得到一个错误:

最佳答案

发生该错误是因为它试图读取非 Confluent Schema Registry 编码的 Avro 消息。

如果主题数据是Avro,则需要使用Schema Registry。

否则,如果主题数据是 JSON,那么您已经在属性文件中的键或值上使用 AvroConverter 启动了连接集群,您需要在其中使用 JsonConverter

关于apache-kafka - ElasticsearchSinkConnector 无法将数据反序列化为 Avro,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50182754/

10-16 06:32