在使用ClickHouse的过程中,数据接入的方式有很多种,最近在尝试使用kafka的方式进行数据的入库,目前大概有两种方案:
- 内部kafka 引擎方式接入
- clickhouse_sinker
尝试第一种方式,既kafka引擎方式进行接入。
具体操作步骤如下:
-
- 搭建kafka服务器 可以参照 spring cloud stream Kafka 示例 里,kafka搭建方式
- 创建kafka 引擎的表
-
CREATE TABLE tkafka (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = ‘192.168.1.198:9092’,
kafka_topic_list = ‘test2’,
kafka_group_name = ‘group1’,
kafka_format = ‘JSONEachRow’,
kafka_row_delimiter = ‘\n’,
kafka_num_consumers = 1; - 创建一个结构表
-
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192); - 创建物化视图
-
CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM tkafka GROUP BY day, level;
整个过程就完毕了,其中需要消息发送主要是JSONEachRow,也就是JSON格式的数据,那么往topic 里面写入JSON数据即可。
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test2 >{"timestamp":"1562209583","level":"2","message":"hello2"}
发送数据后需要关闭通道,不然无法查询到数据。
SELECT level, sum(total) FROM daily GROUP BY level;
问题解答
1. Cannot parse input: expected { before: \0: (at row 2)
问题出在引擎版本上,我使用的是19.3.4 版本。19.1 版本没有问题, 19.5.2.6 版本解决了此问题,也就是中间版本存在这个问题。
原因: 消息中数据之间的分割符号未指定,导致无法处理。
解决办法: 添加 kafka_row_delimiter = ‘\n’,也就是上文键标红的部分。
参考解决地址: https://github.com/yandex/ClickHouse/issues/4442
2. 消息发送后,数据无法查询。
原因:kafka 引擎默认消费根据条数与时间进行入库,不然肯定是没效率的。
解决办法:其中对应的参数有两个。 max_insert_block_size ,stream_flush_interval_ms。
这两个参数都是全局性的。
max_insert_block_size 默认值为: Default value: 1,048,576.
参考地址: https://clickhouse.yandex/docs/zh/operations/settings/settings/#settings-max_insert_block_size
stream-flush-interval-ms 默认值为: The default value is 7500.
参考地址:
https://clickhouse.yandex/docs/zh/operations/settings/settings/#stream-flush-interval-ms
这个参数改小时影响整个数据库的,所以如果不好调整请采用方案2。clickhouse_sinker.
github 地址: https://github.com/housepower/clickhouse_sinker