FlinkSQL流式关联Hbase大表方案(走二级索引)

阅读: 评论:0

FlinkSQL流式关联Hbase⼤表⽅案(⾛⼆级索引)
我们在做实时数据开发的时候,通常要⽤spark、flink去消费kafka的数据,拿到数据流后会和外部数据库(Hbase、MySQL等)进⾏维表关联来把数据流打宽。当然了,有些外部数据库不只是存储维度数据,也会有很多事实数据,并且这些数据更新频繁,数据量巨⼤,但是我们的Flink流也会去实时的join这些巨⼤的事实表,这就需要选择⼀个合适的外部数据库作为⽀持,这个外部数据库⼀定要满⾜海量数据⾼效的读写性能,这样才能满⾜实时场景的需求,说到这,我们的⽬光⾃然⽽然的落到了Hbase上,来吧,我们直接上图,下⾯这张图就是以上所说场景的⼀个基本架构
那么问题来了,FlinkSQL如何去关联Hbase⼤表呢,如果关联字段不是hbase维表的rowkey那么将会触发全表扫描,如果这个表很⼤,全表扫描效率就很不乐观了,耗时少则⼏秒,多则⽆限延长,所以我们⼀定是要⾛hbase⼆级索引的,但是很遗憾,FlinkSQL⾥的Hbase connector不会处理索引,它要么scan,要么就get,那么我们该怎么办呢,别急,我们也有笨办法,那就是我们⾃⼰维护索引表,如果你还不懂hbase⼆级索引的实现⽅式请⾃⾏补充这⽅⾯知识,下⾯的内容就是有关⼆级索引的试⽤了,看图吧
万向车
点火模块来,我描述⼀下上图的流程,⾸先消费到kafka数据后我们的流不能直接去join hbase的数据表⽽是要先去join索引表,这样就拿到了数据表的rowkey,然后我们再join数据表,这样就不会触发全表扫描了,⽽是通过rowkey查询,效率就⼀下⼦有了质的提升,那么代码是怎么实现呢?上案例!稍等,我先说⼀下这个案例,我们需要在Flink流中创建三个表:kafka表、hbase维度索引表、hbase维度表,然后我们就可以愉快的写SQL了
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = ateLocalEnvironment();
StreamTableEnvironment tEnv = ate(env);
//接⼊socket流,测试数据如下
//{'name':'kafka_tb','type':'INSERT','new':{'id':'1','name':'lxz'}}
DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
//定义kafka_tb表类型(有序)
TypeInformation[] kafka_tb_types = new TypeInformation[]{Types.STRING,Types.STRING};
RowTypeInfo kafka_tb_rowType = new RowTypeInfo(kafka_tb_types);
//socket接收到的流转换后注册成kafka_tb表
制作糖果盒
DataStream<Row> ds = dataStream.flatMap(new FlatMapFunction<String, Row>() {
@Override
public void flatMap(String value, Collector<Row> out) throws Exception {
String type = JSON.parseObject(value).getString("type");
JSONObject new_row = JSON.parseObject(value).getJSONObject("new");
switch (type) {
case "INSERT":
}
}).returns(kafka_tb_rowType);
//注册kafka表kafka_tb
Schema schema01 = wBuilder().build();
Table tab1 = tEnv.fromChangelogStream(ds,schema01).as("id","name");
//注册Hbase索引表hbase_index_tb
" ID STRING,\n" +
" CF ROW<NAME STRING>,\n" +
" PRIMARY KEY (ID) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'hbase_index_tb',\n" +
商场柜台制作" 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',\n" +
" 'de.parent' = '/hbase-unsecure'\n"+
")");
//注册Hbase数据表hbase_data_tb
" ID STRING,\n" +
" CF ROW<CITY STRING,AGE INT,SEX STRING,NAME STRING,GRADE FLOAT>,\n" +
" PRIMARY KEY (ID) NOT ENFORCED\n" +
") WITH (\n" +不锈钢液压管接头
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'hbase_data_tb',\n" +
" 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',\n" +
" 'de.parent' = '/hbase-unsecure'\n"+
")");
//执⾏关联查询
"from hbase_data_tb a " +
"join hbase_index_tb b " +
"on a.ID = b.ID " +
"join kafka_tb c " +
蜂窝纸板托盘"on  c.name=b.NAME").print();
}

本文发布于:2023-07-22 20:51:03,感谢您对本站的认可!

本文链接:https://patent.en369.cn/patent/2/187504.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   需要   数据流   扫描   全表   制作
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 369专利查询检索平台 豫ICP备2021025688号-20 网站地图