最近项目上有一个需求,需要将两张表(A表和B表)的数据进行关联并回写入其中一张表(A表),两张表都是分区表,但是关联条件不包括分区字段。
最朴素的想法,直接关联执行,全表关联,一条SQL搞定全部逻辑。想法越简单,执行越困难。由于数据量大,服务器规模较小,尽管各台服务器内存和CPU配置都很高,关联会将数据读取到内存,内存根本放不下,而且集群配置了workload group,可使用内存更小了,方案一不可行。
可以在关联时增加分区字段对任务进行拆解,这样可以实现,但是会形成笛卡尔积,历史数据量巨大,分区较多(A表和B表都是1年),缺点也很明显。
经过评估上述方案二不可行。
通过外部计算和存储来实现,可选的有Hive、Spark、Flink。三种方案都是可行的,但是从操作复杂度来看使用spark-sql直接读取Doris进行关联并写回Doris,除DDL外,只需要一条SQL即可搞定。
部署
将上述安装文件上传至Hadoop集群的其中一台机器,放置到任意目录,比如/opt,请保证使用的用户可以向Yarn提交任务。
解压缩spark-3.4.3-bin-hadoop3.tgz得到spark-3.4.3-bin-hadoop3目录
将spark-doris-connector-3.4_2.12-1.3.2.jar放到spark-3.4.3-bin-hadoop3/jars/
启动spark-sql
bin/spark-sql --master yarn --num-executors 40 --executor-memory 7G --name Spark-SQL:Doris
-- 用于读取A表数据
CREATE
TEMPORARY VIEW spark_doris_a
USING doris
OPTIONS(
"table.identifier"="mydb.table_a",
"fenodes"="10.*.*.1:9030,10.*.*.2:9030,10.*.*.3:9030",
"user"="root",
"password"="$YOUR_DORIS_PASSWORD"
);
-- 用于读取B表数据
CREATE
TEMPORARY VIEW spark_doris_b
USING doris
OPTIONS(
"table.identifier"="mydb.table_b",
"fenodes"="10.*.*.1:9030,10.*.*.2:9030,10.*.*.3:9030",
"user"="root",
"password"="$YOUR_DORIS_PASSWORD"
);
-- 用于向A表部分列写入数据
CREATE
TEMPORARY VIEW spark_doris_a_sink
USING doris
OPTIONS(
"table.identifier"="mydb.table_a",
"fenodes"="10.*.*.1:9030,10.*.*.2:9030,10.*.*.3:9030",
"user"="root",
"password"="$YOUR_DORIS_PASSWORD",
"sink.properties.partial_columns"="true",
"sink.properties.column"="'column_a','column_b','column_c'"
);
set enable_unique_key_partial_update=true;
set enable_insert_strict=false;
insert into spark_doris_a_sink
(column_a,column_b,column_c)
select a.column_a,b.column_b,b.column_c from spark_doris_a a left join spark_doris_b b on a.colum_d = b.column_e;
处理过程很简单,但是实际也是遇到了很多问题