本文介绍了为什么将元数据添加到此Kafka连接器的输出中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka连接器,其中带有SourceTask实现中poll()方法的以下代码.

I have a Kafka connector with the following code for the poll() method in the SourceTask implementation.

@Override
public List<SourceRecord> poll() throws InterruptedException 
{
    SomeType item = mQueue.take();
    List<SourceRecord> records = new ArrayList<>();
    SourceRecord[] sourceRecords = new SourceRecord[]{
        new SourceRecord(null, null, "data", null,
                         Schema.STRING_SCHEMA, "foo",
                         Schema.STRING_SCHEMA, "bar")
    };
    Collections.addAll(records, sourceRecords);

    return records;
}

如果将使用者附加到数据主题,则会收到来自连接器的以下消息:

If I attach a consumer to the data topic, I get the following message sent through from the connector:

{"schema":{"type":"string","optional":false},"payload":"foo"}   {"schema":{"type":"string","optional":false},"payload":"bar"}

如果我使用以下命令直接向该主题发布消息:

If I publish a message straight to the topic using the following commands:

echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,

然后附加相同的消费者,我得到以下消息:

Then attach the same consumer, I get this message:

foo bar

这是我期望从连接器实现中看到的输出,而不是我收到的{"schema":...消息.

This is what I expected to see as the output from the connector implementation, rather than the {"schema":... message I received.

如何更改poll()的实现,以便在发送消息时不会在消息的实际键和值中出现架构元数据?

How do I change the implementation of poll() so that the message is sent without the schema meta data appearing in the actual key and value of the message?

推荐答案

好吧,原来是因为我在connect-standalone.properties

Ok, turns out it was just because I had the following lines in connect-standalone.properties

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

我应该有

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

作为一种替代解决方案,我还可以将以下设置从true更改为false

As an alternative solution, I was also able to change the following setting from true to false

value.converter.schemas.enable=false

然后在我的处理器类中,将代码更改为:

Then in my processor class I changed the code to:

SourceRecord[] sourceRecords = new SourceRecord[]{
    new SourceRecord(null, null, "data", null,
                     Schema.STRING_SCHEMA, "foo",
                     null, "bar")
};

这有所不同,因为我不再为该值指定架构.

This differs because I'm no longer specifying a schema for the value.

这篇关于为什么将元数据添加到此Kafka连接器的输出中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 20:22