我已经使用 avro-maven-plugin 从 Avro 模式生成了我的 Avro Java 类。我将我的 avro 类序列化为一个字节数组,然后将其写入 kafka 主题。

然后我有一个 kafka 流,它试图操纵 avro 数据来做某事。在反序列化过程中,我从同一个类中获得了 ClassCastExcetion。我读到这个问题是由于 Avro 在回退时使用的不同 ClassLoader(ClassLoader 的新实例)而产生的。

有一种方法可以强制 Avro 使用调用者的 ClassLoader 或类似的东西吗?

KafkaStream 属性

this.props = new Properties();
        this.props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        this.props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        this.props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());

我正在使用字符串键和序列化的 avro 字节数组,然后我需要手动反序列化 avro 的有效负载。
我使用 avro 的解码器像这样反序列化:
AvroPayload stp = AvroPayload.fromByteBuffer(ByteBuffer.wrap(bytes));

甚至像这样:
AvroPayload stp = AvroPayload.getDecoder().decode(ByteBuffer.wrap(bytes));

在第一个版本中,调试我可以看到,如果我保留在 avro 生成的类上下文中,字节数组会正确地反序列化为 AvroPayload 类。返回那个新实例可能会抛出一个 ClassCastException

最佳答案

我找到的唯一解决方案是按照建议将 avro 类放入外部 jar 中,然后将其导入。

这不是一个好的解决方案,因为它需要大量配置来保持 avro 模式和生成的类的耦合,但这是我唯一发现的。

我配置了一个 maven 项目,将 avro 的类 jar 生成到一个目录中,其名称中没有工件版本,因此我始终可以导入最新版本而无需更改 pom。

如果有人会发现其他解决方案,请发布

关于java - 从 kafka 流和 Avro 反序列化的同一类上的 ClassCastException,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56890432/

10-13 02:41