本文介绍了是否在Confluent 4.1 + Kafka 1.1中为Kafka Connect打包自定义Java`partitioner.class`插件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经成功地使用了用Java编写的简单自定义Partitioner类,用于Confluent 3.2.x(Kafka 0.10.x)上的Kafka Connect接收器.我想升级到Confluent 4.1(Kafka 1.1),并且遇到错误.

Kafka Connect的插件加载机制似乎在CP 3.3.0中已更改.以前,只有CLASSPATH选项,但是在CP 3.3.0+中,有一种更新且推荐的plugin.path机制.

如果我尝试继续使用旧的CLASSPATH插件机制,那么当我尝试使用插件时,会得到:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

那是CP内部类.对于较旧的CP 3.2.x,它可以在类路径中使用,但是在CP> = 3.3.0中采用了新的类路径隔离措施,我认为必须与插件一起提供.

我认为切换到推荐的更新版本plugin.path机制是明智的.我删除了CLASSPATH条目.在默认的/etc/kafka/connect-distributed.properties中,我看到了plugin.path=/usr/share/java,因此我将插件.jar安装到了/usr/share/java/my-custom-partitioner/my-custom-partitioner.jar.我也尝试在其中添加和不添加依赖项.jar文件.

启动Kafka Connect服务时,我的插件似乎已加载:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

当我这样做时:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

我得到:

{"error_code":500,"message":null}%             

我可以在kafka connect systemd日志中看到:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

不清楚是哪里出了问题,或者为什么我的分区器类无法正确加载.

仅供参考,我用CP 4.1 + Kafka 1.1依赖关系重建了Java插件,并做了一些小的更新以匹配API更改,例如将getSchemaGeneratorClass的实现添加到我的分区器类中.

解决方案

自定义Kafka Connect分区程序类将无法通过旧的CLASSPATH机制运行,并且不能与具有更新的Kafka 0.11.0+隔离插件机制的插件一起运行./p>

唯一可行的解​​决方案是将具有自定义Kafka Connect Partitioner类的自定义.jar文件复制到位于/usr/share/java/kafka-connect-storage-common/kafka-connect-storage-common插件目录中.自定义Kafka Connect Partitioner插件类必须存在于同一目录中,以便它们位于同一隔离的类加载器中.

仅供参考,您可以看到,Kafka 0.11.0+隔离插件机制将仅加载四个特定Java类的子类,这些子类不在此处涵盖Kafka Connect分区程序:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader. #L279

感谢cricket_007推荐此确切的解决方案:将自定义Kafka Connect分区器.jar文件放在/share/java/kafka-storage-common目录中.我确切地了解了为什么必须这样做以及为什么其他方法无效的艰难方法.

I've been successfully using a simple custom Partitioner class written in Java for a Kafka Connect sink on Confluent 3.2.x (Kafka 0.10.x). I want to upgrade to Confluent 4.1 (Kafka 1.1) and am experiencing errors.

Kafka Connect's plugin loading mechanism seems to have been changed in CP 3.3.0. Previously, there was just the CLASSPATH option, but with CP 3.3.0+ there is a newer and recommended plugin.path mechanism.

If I try to keep using the legacy CLASSPATH plugin mechanism, when I try to use my plugin, I get:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

That is a CP internal class. With the older CP 3.2.x, that was available on the classpath, however with the new classpath isolation efforts in CP >= 3.3.0, I presume that must be provided along with the plugin.

I figure it's wise to switch to the newer recommended plugin.path mechanism. I remove the CLASSPATH entry. In the default /etc/kafka/connect-distributed.properties, I see plugin.path=/usr/share/java, so I install my plugin .jar to /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar. I have tried adding and not adding dependency .jar files there as well.

My plugin seems to get loaded when the Kafka Connect service starts up:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

When I do:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

I get:

{"error_code":500,"message":null}%             

I can see in the kafka connect systemd log:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

It's not apparent what is going wrong or why my partitioner class isn't loading correctly.

FYI, I have rebuilt my Java plugin with CP 4.1 + Kafka 1.1 dependencies and made small updates to match API changes such as adding an implementation for getSchemaGeneratorClass to my partitioner class.

解决方案

Custom Kafka Connect Partitioner classes will not work via the old CLASSPATH mechanism and they will not work as plugins with the newer Kafka 0.11.0+ isolated plugin mechanism.

The only working solution is to copy your custom .jar file with your custom Kafka Connect Partitioner class into the kafka-connect-storage-common plugin directorry at /usr/share/java/kafka-connect-storage-common/. Custom Kafka Connect Partitioner plugin classes must exist in that same directory so they are in the same isolated class loader.

FYI, you can see that the Kafka 0.11.0+ isolated plugin mechanism will only load subclasses of four specific Java classes that do not cover the Kafka Connect partitioners here:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L279

Thanks to cricket_007 for recommending this exact solution: putting custom Kafka Connect partitioner .jar files in the /share/java/kafka-storage-common directory. I learned the hard way exactly why this has to be done and why alternatives don't work.

这篇关于是否在Confluent 4.1 + Kafka 1.1中为Kafka Connect打包自定义Java`partitioner.class`插件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 01:53