问题:设置了通过Kafka Connect JDBC源和Avro序列化器和反序列化器使用拉入数据的Kafka管道后,一旦我尝试使用Kafka Streams Java应用将该数据读入KStream中,就会出现以下错误。


org.apache.kafka.common.errors.SerializationException:数据大小
LongDeserializer收到的不是8


我试图尽可能地遵循现有的示例,但是有些事情没有意义。我将在下面提供所有代码/其他信息,但是我有两个问题……


我目前所了解的最大差距之一是Avro记录的“ KEY”使用了什么?我(在运行时)出错的行与我告诉KStream密钥是一个LONG的事实有关,但是在检索Avro记录时,该长度小于8(LONG的预期长度)类型)。
当我设置JDBC Source时,没有任何东西可以标识密钥是什么-尽管我尝试了以下操作,但在文档中却没有看到任何内容使我相信可以指定密钥:

curl -X POST \
  -H "Content-Type: application/json" \
  --data 'see next code block for formatted data'  \
http://localhost:8083/connectors

// This is the data chunk used above but in a string - broke it apart for readability here
{
    "name": "source-jdbc-ldw_applications",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": 1,
        "connection.url": "jdbc:sqlserver://dbserver;databaseName=dbname;user=kafkareader;password=kafkareader;",
        "mode": "incrementing",
        "incrementing.column.name": "ApplicationID",
        "topic.prefix": "source-jdbc-",
        "poll.interval.ms": 30000,
        "table.whitelist": "LDW_Applications",
        "transforms": "setSchema",
        "transforms.setSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
        "transforms.setSchema.schema.name": "com.mycompany.avro.Application",
        "transforms.setSchema.schema.version": "1"
    }
}



通过以上操作,我通过运行获得了以下架构:

curl http://localhost:8081/subjects/source-jdbc-LDW_Applications-value/versions/1 |jq


这是输出:

{
    "subject": "source-jdbc-LDW_Applications-value",
    "version": 1,
    "id": 9,
    "schema": "{\"type\":\"record\",\"name\":\"Application\",\"namespace\":\"com.baydynamics.avro\",\"fields\":[{\"name\":\"ApplicationID\",\"type\":\"long\"},{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Group\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"OwnerUserID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"RiskScore\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"RiskRating\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ServiceLevelTierID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"LossPotentialID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ConfidentialityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"IntegrityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"AvailabilityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ApplicationCategoryID\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"com.baydynamics.avro.Application\"}"
}


要稍微了解一下该架构,请执行以下操作:

{
"type":"record",
"name":"Application",
"namespace":"com.baydynamics.avro",
"fields":[
    {
        "name":"ApplicationID",
        "type":"long"
    },
    {
        "name":"Name",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Description",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Group",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"OwnerUserID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    },
    {
        "name":"RiskScore",
        "type":[
            "null",
            {
            "type":"int",
            "connect.type":"int16"
            }
        ],
        "default":null
    },
    {
        "name":"RiskRating",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"ServiceLevelTierID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"LossPotentialID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ConfidentialityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"IntegrityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"AvailabilityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ApplicationCategoryID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    }
],
"connect.version":1,
"connect.name":"com.baydynamics.avro.Application"
}


因此,我再也看不到任何可以指示该记录关键的特定字段。

所以然后我进入Kafka Streams,然后尝试将这些数据导入KStream ...然后炸毁...

final KStream<Long, Application> applicationStream = builder.stream(Serdes.Long(), applicationSerde, VULNERABILITY_TOPIC);


就是这样,因为我知道幕后存储的数据是SQL Server中的BIGINT,并且映射到Java中的LONG,所以我将KStream的键类型设置为Long,然后将Serdes.Long()反序列化器用于KStream构建器的参数。

调试时,我看到原始记录的长度为7,这就是它引发错误的原因。显然,Avro将事物序列化以更好地压缩吗?我不知道。无论如何,问题是我什至不知道它实际上在使用什么密钥?那么谁知道-也许我对Long的假设是错误的,因为它实际上没有使用ApplicationID作为密钥?我什至会以为呢?

任何帮助,将不胜感激。我知道那里有很多信息,但总而言之。


使用JDBC Kafka connect将数据推送到主题中
数据使它成为主题-我可以通过控制台看到它
尝试将数据推送到流中,这样我就可以对数据做一些很棒的事情,并且由于Serdes与Avro Record不兼容,尝试填充流时它会崩溃


更新1:
根据下面的Randall的建议,我去尝试了SMT(单消息转换),现在我为每条记录设置了一个密钥,这是朝正确方向迈出的重要一步,但是由于某种原因,似乎没有强制性强制转换为Long(INT64)具有任何实际效果。我已经用SMT拍摄了一些连接器配置的屏幕截图,生成的记录(现在有一个键!)和我在Kafka流中看到的相同错误:

最佳答案

Confluent JDBC source connector不会生成带有键的记录。添加该支持的feature request已被记录。

同时,您可以使用单个消息转换从值中提取一些字段,从而从本质上创建键。内置的ValueToKey transform正是这样做的。 This blog post具有该SMT的示例。

10-08 02:58