clickhouse与kafka集成 - 简书


本站和网页 http://events.jianshu.io/p/5f372ee92343 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

clickhouse与kafka集成 - 简书登录注册写文章首页下载APP会员IT技术clickhouse与kafka集成站在海边看远方关注赞赏支持clickhouse与kafka集成clickhouse支持与多种存储引擎集成,可以从集成的引擎里面读取消息,然后写到真正的数据存储表里。
clickhouse批量写入的性能比较好,我们的业务场景下会大批量的产生数据,如果使用clickhouse-jdbc去写的,写入时机和每批次写入的数量不好把控,最终选择了先将消息写入kafka,然后由clickhouse从kafka消费数据,clickhouse server消费到数据之后写入真正的数据表。
clickhouse集成kafka引擎见官方文档:
https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka/
下面的介绍会与官方文档有重复,然后补充一些集成过程中遇到的坑。
下面介绍clickhouse与kafka集成的步骤,clickhouse版本是22.1.3.7
集成kafka
参数解释
必要参数
kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
kafka_topic_list – topic 列表 (my_topic)。
kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。
可选参数
kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。例如,普罗托船长 需要 schema 文件路径以及根对象 schema.capnp:Message 的名字。
kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
关于必选参数中的kafka_format参数,参见Formats部分,format具体解释如下
https://clickhouse.com/docs/zh/interfaces/formats/。
JSONEachRow, JSONStringsEachRow, JSONCompactEachRow, JSONCompactStringsEachRow
这几种格式,ClickHouse会将行输出为用换行符分隔的JSON值,这些输出数据作为一个整体时,由于没有分隔符(,)因而不是有效的JSON文档。
官方文档给了一些示例。
{"some_int":42,"some_str":"hello","some_tuple":[1,"a"]} // JSONEachRow
[42,"hello",[1,"a"]] // JSONCompactEachRow
["42","hello","(2,'a')"] // JSONCompactStringsEachRow
由于我的真实的数据表,有一个字段是json类型的字符串,但是一开始设置kafka_format的类型为JSONEachRow时,从kafka消费数据会报错,所以kafka_format格式设置成了JSONAsString,具体的错误后面贴出来。
创建引擎表
创建kafka引擎表,用于从kafka消费数据
CREATE TABLE msg_json_source (
msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_test',
kafka_group_name = 'topic_test_consumer',
kafka_format = 'JSONAsString'
由于我的数据结构里有嵌套json,如果使用JSONEachRow,有个字段是json类型的字符串,带转义字符,导致clickhouse解析失败,没找到解决办法,所以使用了JSONAsString格式。
创建真实数据表
CREATE TABLE msg_target
biz Nullable(String),
sender_id String,
msg_id UInt64,
status String,
status_time UInt64,
content String,
event_time DateTime Default now()
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (msg_id, status_time)
TTL event_time + INTERVAl 1 YEAR
一个简单的MergeTree引擎的表,其中content是json格式的字符串。
创建物化视图
CREATE MATERIALIZED VIEW msg_json_source_consumer TO msg_target
AS SELECT
JSONExtractString(msg,'biz') as biz,
JSONExtractString(msg,'sender_id') as sender_id,
JSONExtractUInt(msg,'msg_id') as msg_id,
JSONExtractString(msg,'status') as status,
JSONExtractUInt(msg,'status_time') as status_time,
JSONExtractString(msg,'content') as content
FROM msg_json_source
创建的物化视图用于把从kafka消费到的数据,写到真实的数据表里,在这个例子里,msg_json_source从kafka消费到数据,然后通过物化视图msg_json_source_consumer将消费到的数据写到真实的数据表msg_target中。
由于从kafka消费到的数据就是一个json字符串,在这里使用JSONExtractString等json字段提取工具,提取msg里的字段,比如biz,sender_id,content等字段。
status_time原本计划用DatTime64类型的,但是这个时间格式有坑,最终选择了使用UInt64存毫秒级时间戳,具体的问题下面再介绍。
往kafka写消息
在clickhouse创建好3张表之后(kafka引擎表,真实数据表,物化视图表),往kafka发消息
本地安装一个简易的kafka服务端,然后创建topic
/opt/dev/confluent-7.0.1/bin/
./kafka-topics --create --topic topic_test --bootstrap-server localhost:9092
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic topic_test.
创建好topic之后,使用Java客户端往kafka发消息,使用confluent client发也可以。
添加kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
实体类,使用fastjson的@JSONField注解,实体类转字符串的时候,将驼峰转换为下划线
import com.alibaba.fastjson.annotation.JSONField;
public class MsgJsonSource {
private String biz;
@JSONField(name = "sender_id")
private String senderId;
@JSONField(name = "msg_id")
private Long msgId;
private String status;
@JSONField(name = "status_time")
private Long statusTime;
private Map<String,Object> content;
//省略getter/setter
测试类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fc.model.MsgJsonSource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
public class KafkaSendTest {
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据");
// 4. 调用 send 方法,发送消息
String topic = "topic_test";
for (int i = 0; i < 10; i++) {
msgJsonSource.setContent(content);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send(new ProducerRecord<>(topic, JSON.toJSONString(msgJsonSource)));
System.out.println("当前消息序号: " + i + ", 发送结果: " + JSON.toJSONString(recordMetadataFuture));
// 5. 关闭资源
kafkaProducer.close();
最终发送完,我们查看一下clickhouse里的数据表的数据,可以发现我们发送到kakfa里的数据,已经成功的消费,并且写入到真实的数据表里了。
image.png
遇到的问题
版本问题
当时测试环境部署的版本是21.9,但是这个版本有问题,不推荐安装,建议直接部署22以上的clickhouse
JSONEachRow
我一开始就是使用的JSONEachRow格式,但是我的消息体里还有嵌套的json,类似下面这种格式,里面有个字段还是个json,转行成字符串带转义字符。
然后消息体的string字符串贴一条在这里
{"biz":"biz","content":"{\"current\":1648817159914,\"abc\":\"111\",\"text\":\"gggg\",\"req\":{\"nested\":0}}","msg_id":7561248342312669573,"sender_id":"test_0","status":"status_0","status_time":1648817159914}
然后clickhouse解析消息体报错,当时的错找不到了,现在复现不出来了,非常的难顶。。。。
后来因为赶版本的原因把kafka_format换成了JSONAsString。
时间格式问题
clickhouse是支持DateTime64格式的,可以到毫秒级,但是实际使用过程中却有些坑在,
首先是有的客户端解析毫秒字符串有问题,其次是使用JSONExtract*的方法,会有差异,再然后是jdbc查询的时候,也会导致时间查询有问题。
拿毫秒时间戳和秒级时间戳做试验,clickhouse-server版本是22.3.1.1
把上面的kafka引擎表拿出来改一下
#kafka引擎
CREATE TABLE msg_json_source_004 (
msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_test_004',
kafka_group_name = 'topic_test_004_consumer',
kafka_format = 'JSONAsString'
#真实数据表
CREATE TABLE msg_target_004
biz Nullable(String),
sender_id String,
msg_id UInt64,
status String,
status_time DateTime64(3, 'Asia/Shanghai'),
content String,
event_time DateTime Default now()
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (msg_id, status_time)
TTL event_time + INTERVAl 1 YEAR
#物化视图
CREATE MATERIALIZED VIEW msg_json_source_004_consumer TO msg_target_004
AS SELECT
JSONExtractString(msg,'biz') as biz,
JSONExtractString(msg,'sender_id') as sender_id,
JSONExtractUInt(msg,'msg_id') as msg_id,
JSONExtractString(msg,'status') as status,
JSONExtractUInt(msg,'status_time') as status_time,
JSONExtractString(msg,'content') as content
FROM msg_json_source_004
其中status_time这个字段的类型改成DateTime64(3, 'Asia/Shanghai'),使用JSONExtractUInt提取时间,看下效果
首先发条数据,数据内容如下
{"biz":"biz","content":"1111","msg_id":7253917707485514799,"sender_id":"test_0","status":"status_0","status_time":1648901022355}
传入的是毫秒级时间戳,然后数据表存储的时候就变成了2282年
image.png
然后如果传入秒级的时间戳,真实的数据是这样
{"biz":"biz_3","content":"00003","msg_id":3053111163555706035,"sender_id":"test_3","status":"status_3","status_time":1648901508}
clickhouse存储的时候看着时间正常了,但是毫秒丢失了
image.png
然后修改一下物化视图的字段提取方式,之前是 JSONExtractUInt(msg,'status_time') as status_time,现在改成使用 JSONExtractString(msg,'status_time') as status_time提取时间
会发现时间类型又正常了。
这一条数据内容如下
{"biz":"biz_1","content":"2222","msg_id":9151431369051819265,"sender_id":"test_1","status":"status_1","status_time":1648901194855}
最终使用JSONExtractString提取毫秒时间戳,得到了正确的DateTime64的时间,非常的神奇
image.png
最终我决定来了个釜底抽薪的方法,时间直接用UInt64存,因为我发送出去的数据是毫秒级时间戳,最终存时间戳,查询时间范围的时候直接用long类型的数据between好了。
这也是无奈之举,万一哪天server更新版本,导致时间出现问题,那就完蛋了,希望后面时间可以稳定一点吧。
推荐阅读更多精彩内容Clickhouse Kafka引擎表使用进阶 由于项目刚启动,人手不足,kafka引擎表在我们项目中应用很多,基本靠kafka引擎表来做日志计量工作。目前...淡淡的小番茄阅读 7,957评论 6赞 2Flink+Clickhouse实时数仓在广投集团的最佳实践Flink+Clickhouse实时数仓在广投集团的最佳实践 一、业务背景 由于历史原因,大型集团企业往往多个帐套...Wping_1c08阅读 3,433评论 3赞 27Flink+Clickhouse在广投集团实时数仓的最佳实践Flink+Clickhouse实时数仓在广投集团的最佳实践 一、业务背景 由于历史原因,大型集团企业往往多个帐套...Wping_1c08阅读 3,923评论 1赞 27ClickHouse 21.7.3.14-2(四) 表引擎表引擎的使用 表引擎是 ClickHouse 的一大特色。可以说, 表引擎决定了如何存储表的数据。包括: 数据的存..._大叔_阅读 450评论 0赞 1ClickHouse原理解析与应用实践第1章 ClickHouse的前世今生 在大量数据分析场景的解决方案中,传统关系型数据库很快就被Hadoop生态所...yeedom阅读 3,584评论 0赞 7《ClickHouse原理解析与应用实践》读书总结本文是对《ClickHouse原理解析与应用实践》一书的概括性总结,整体章节和结构尊重原文,由于书的出版在2019...Caucher阅读 3,432评论 1赞 12[ClickHouse]表引擎学习分享设计理念 Everything is table(万物皆表),数据表就是ClickHouse和外部交互的接口。在数...愚公300代阅读 853评论 0赞 0ClickHouse数据导入一 概述 目前Kafka数据导入ClickHouse的常用方案有两种,一种是通过ClickHouse内置的Kafk...gaofubao阅读 5,536评论 3赞 4篇二|什么是ClickHouse的表引擎?在上一篇分享中,我们介绍了ClickHouse的安装部署和简单使用。本文将介绍ClickHouse中一个非常重要的...大数据技术与数仓阅读 5,244评论 0赞 11分享一篇牛人的工作总结转自http://www.ituring.com.cn/article/497377,让大家感受一下什么叫优秀的架...smooth00阅读 680评论 0赞 2ClickHouse 表引擎介绍表引擎介绍: ClickHouse表引擎决定了如下几个方面: 怎样存储数据 -将数据写到哪里, 怎样读取数据. 支...c3729438b799阅读 18,387评论 0赞 7DBMS 数据库管理系统的三级模式架构《ClickHouse 实战:企业级大数据分析引擎》引文 计算机科学领域的所有问题,都可以通过添加一层中间层来解决。通过在用户和计算机中间添加一层逻辑层(概念模型层)...禅与计算机程序设计艺术阅读 421评论 0赞 2Flink SQL Connector(一)- Format & KafkaFormat Flink 提供了一套与表连接器(table connector)一起使用的表格式(table fo...Alex90阅读 3,056评论 0赞 0ClickHouse - 045、数据库引擎 ClickHouse 中支持在创建数据库时指定引擎,目前比较常用的两种引擎为默认引擎和 MySQL...ArthurHC阅读 227评论 0赞 1开源日志项目(java,zookeeper,kafka,fluentd,gohangout,c...记一次日志项目从日志采集,分布式协调,中间件消费,到数据库存储管理,到图形化界面展示 1 安装java环境 官网下...W朱珠W阅读 1,674评论 2赞 1Substrate的transaction-payment模块分析Substrate的transaction-payment模块分析 transaction-payment模块提供...建怀阅读 7,532评论 0赞 42019-11-28 17316宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...yichen大刀阅读 5,390评论 0赞 42019-11-28三数灯谜分析公元:2019年11月28日19时42分农历:二零一九年 十一月 初三日 戌时干支:己亥乙亥己巳甲戌当月节气:立冬...石放阅读 6,449评论 0赞 2零基础也不怕!CAD新手快速入门技巧,后悔没有早知道!想要快速入门CAD,对于零基础的新手来说的确有一定的困难。不过只要你掌握了以下这些CAD快速入门技巧,你就跨进了C...努力的王榆阅读 2,661评论 0赞 32019-11-27昨天考过了阿里规范,心里舒坦了好多,敲代码也犹如神助。早早完成工作回家喽常亚星阅读 2,789评论 0赞 1评论4赞33赞4赞赞赏更多好文