问题描述
我正在使用Apache Flink中的Kafka连接器来访问 Confluent Kafka 服务的流.
I am using Kafka Connector in Apache Flink for access to streams served by Confluent Kafka.
除架构注册表URL ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
外,期望使用读取器"架构.我不想提供读取的架构,而是要使用相同的编写者的架构(在注册表中查找)来读取消息,因为Consumer将没有最新的架构.
Apart from schema registry url ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
expecting 'reader' schema. Instead of providing read schema I want to use same writer's schema(lookup in registry) for reading the message too because Consumer will not have latest schema.
FlinkKafkaConsumer010<GenericRecord> myConsumer =
new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();
https://ci .apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html 使用这些反序列化模式记录将与从Schema Registry检索并转换为静态提供的模式一起读取".
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html"Using these deserialization schema record will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided"
由于我不想保留用户端的架构定义,如何使用编写者的架构反序列化来自Kafka的Avro消息?
Since I do not want to keep schema definition at consumer side how do I deserialize Avro message from Kafka using writer's schema?
感谢您的帮助!
推荐答案
我认为无法直接使用ConfluentRegistryAvroDeserializationSchema.forGeneric
.它旨在与阅读器架构一起使用,并且它们具有对此进行检查的前提条件.
I don't think it is possible to use directly ConfluentRegistryAvroDeserializationSchema.forGeneric
. It is intended to be used with a reader schema and they have preconditions checking for this.
您必须实现自己的.导入两个东西:
You have to implement your own. Two import things:
- 将
specific.avro.reader
设置为false(否则将获得特定记录) -
KafkaAvroDeserializer
必须延迟初始化(因为它本身无法序列化,因为它拥有对架构注册表客户端的引用)
- Set
specific.avro.reader
to false (other wise you'll get specific records) - The
KafkaAvroDeserializer
has to be lazily initialized (because it isn't serializable it self, as it holds a reference to the schema registry client)
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
public class KafkaGenericAvroDeserializationSchema
implements KeyedDeserializationSchema<GenericRecord> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}
env.addSource(
new FlinkKafkaConsumer<>(
topic,
new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl),
kafkaProperties));
这篇关于是否可以在没有在ConfluentRegistryAvroDeserializationSchema中给出Reader模式的情况下反序列化Avro消息(使用来自Kafka的消息)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!