本文介绍了将数据从 Kafka Topic 以 JSON 格式推送到 PostgreSQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新后出错

[2019-07-29 12:52:23,301] INFO Initializing writer using SQL dialect: PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask{id=sink-postgres-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Error while fetching metadata with correlation id 2 : {kafkadad=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368] INFO Cluster ID: _gRuX5-0SUu72wzy6PV0Ag (org.apache.kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Discovered group coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12:52:23,373] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,383] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Setting newly assigned partitions: kafkadad-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23,501] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Resetting offset for partition kafkadad-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:701)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:745)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
[2019-07-29 12:52:35,347] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Member consumer-1-bdbc7035-7625-4701-9ca7-c1ffa6863456 sending LeaveGroup request to coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)

生产者控制台:

connect-standalone.properties 文件

bootstrap.servers=localhost:9092 
key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets 
offset.flush.interval.ms=10000
plugin.path=/home/kafka/confluent-5.2.1/share/java

connect-post.properties 文件

name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=kafkada
connection.url=jdbc:postgresql://localhost:5432/kafkadb?
user=postgres&password=postgres
insert.mode=upsert
table.name.format=kafkatable
pk.mode=none
pk.fields=none
auto.create=true 
auto.evolve=false
offset.storage.file.filename=/tmp/post-sink.offsets

上述错误是我通过apache kafka ./bin/connect-standalone.sh config/connect-standalone.properties config.postgresql.properties 引起的.

The above error is caused when I did ./bin/connect-standalone.sh config/connect-standalone.properties config.postgresql.properties through apache kafka.

然后,我尝试并实现了此链接中提到的流程:

Then, I have tried and achieved the flow mentioned in this link:

https://hellokoding.com/kafka-connect-sinks-data-to-postgres-example-with-avro-schema-registry-and-python

但是,这里的数据是使用 avro 从 Python 代码生成的.但就我而言,我已经有来自 kafka 主题中的传感器(JSON 格式)的数据,我想将其发送到 postgreSQL,而不是通过代码生成数据.

But, here the data is being generated from Python code using avro. But in my case, I already have data coming from sensors(in JSON format) in kafka topic which I want to send to postgreSQL, instead of generating data through code.

那么,如何实现这种从 kafka 主题向 postgreSQL 发送数据的流程.

So, How can I achieve this flow of sending data from kafka topic to postgreSQL.

我已经分享了我的属性文件如果需要更正请告诉我.我正在发送简单的 json 数据,如 "{"cust_id": 1313131, "month": 12, "expenses": 1313.13}" 我也尝试发送这种类型的数据,但仍然存在错误

I have shared my properties file Please let me know if corrrection is required.I am sending simple json data like "{"cust_id": 1313131, "month": 12, "expenses": 1313.13}" and I also tried sending this type of data but still error exists

示例 json 数据

 {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "int32",
                    "optional": false,
                    "field": "customer_id"
                },
                {
                    "type": "int32",
                    "optional": true,
                    "field": "month"
                },

                {
                    "type": "string",
                    "optional": true,
                    "field": "amount_paid"
                }
            ],
            "optional": false,
            "name": "msgschema"
        },
        "payload": {
           "cust_id": 13, 
           "month": 12, 
           "expenses": 1313.13
        }
    }

我有一个名为 kafkatable 的表,它的列名称为 (customer_id, month, amount_paid) 使用

and I have a table called kafkatable which has column names as (customer_id, month, amount_paid) created using

"CREATE TABLE kafkatable(customer_id int8,month int4,amount_paid decimal(9,2));"

"CREATE TABLE kafkatable( customer_id int8, month int4, amount_paid decimal(9,2) );"

推荐答案

我通过以下更改解决了这个错误

I solved this error by making following changes

  1. insert.mode=insert
  2. 注释掉 table.name.format=kafkatable 因为 table 将通过 auto 创建创建
  3. 删除 connection.url 行末尾的问号.
  4. pk.fields 不应在此处保留 none,请务必提供列名以避免并发症.
  5. postgresql 不支持 int32,因此当我将其更改为 int8 时它工作正常.
  6. 您的架构和负载中的字段具有不同的名称,请确保提供相同的名称.

这篇关于将数据从 Kafka Topic 以 JSON 格式推送到 PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 19:22