本文介绍了为什么Clickhouse不支持将COLUMN添加到kafka表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在向ClickHouse的Kafka队列添加列时遇到问题.

I have a problem adding a column to the Kafka queue in ClickHouse.

我已经使用命令创建了一个表

I've created a table with the command

CREATE TABLE my_db.my_queue ON CLUSTER my_cluster
(
    `ts` String,
    .... some other columns
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '172.21.0.3:9092',
kafka_topic_list = 'my_topic',
kafka_group_name = 'my_group',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_num_consumers = 1,
kafka_skip_broken_messages = 10;

然后尝试添加一列

ALTER TABLE my_db.my_queue  ON CLUSTER my_cluster ADD COLUMN new_column String;

但是遇到错误

SQL Error [48]: ClickHouse exception, code: 48, host: 172.21.0.4, port: 8123; Code: 48,
e.displayText() = DB::Exception: There was an error on [clickhouse-server:9000]: Code: 48,
e.displayText() = DB::Exception: Alter of type 'ADD COLUMN' is not supported by storage Kafka
(version 20.11.4.13 (official build)) (version 20.11.4.13 (official build))

我对ClickHouse和任何分析数据库不熟悉.所以我想知道为什么不支持它?还是我应该以其他方式添加一列?

I am not familiar with ClickHouse and any analytical database.So I am wondering why it is not supported? Or I should add a column in another way?

推荐答案

从Kafka队列中支持具有不同架构的消息的一种方法包括存储原始JSON消息,如下所示:

A way of supporting messages with different schema from a Kafka queue consists on storing the raw JSON messages like this:

CREATE TABLE my_db.my_queue ON CLUSTER my_cluster
(
    `message` String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '172.21.0.3:9092',
kafka_topic_list = 'my_topic',
kafka_group_name = 'my_group',
kafka_format = 'JSONAsString',
kafka_row_delimiter = '\n',
kafka_num_consumers = 1,
kafka_skip_broken_messages = 10;

JSONAsString 格式会将原始JSON存储在邮件列.通过Kafka表,您可以通过实例化视图和 JSON函数.

The JSONAsString format will store the raw JSON in the message column. This way from the Kafka table you can post-process each new row through materialized views and JSON functions.

例如:

CREATE TABLE my_db.post_processed_data (
  `ts` String,
  `another_column` String
)
-- use a proper engine
Engine=Log;

CREATE MATERIALIZED VIEW my_db.my_queue_mv TO my_db.post_processed_data
AS
SELECT
    JSONExtractString(message, 'ts') AS ts,
    JSONExtractString(message, 'another_column') AS another_column
FROM my_db.my_queue;

如果Kafka队列的JSON模式发生任何变化,您可以在 post_processed_data 表中执行 ALTER TABLE ..ADD COLUMN .. 并进行相应的更新物化视图相应地.这样,Kafka表将保持原样.

If there's any change in the JSON schema of the Kafka queue, you can react accordingly doing an ALTER TABLE .. ADD COLUMN .. in the post_processed_data table and updating the materialized view accordingly. That way the Kafka table would remain as it is.

这篇关于为什么Clickhouse不支持将COLUMN添加到kafka表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-19 04:19