使用spark-sql处理Doris大表关联

spark,sql,doris · 浏览次数 : 0

小编点评

**问题描述** 在项目中,需将两张分区表(A表和B表)进行关联,并将关联后的数据回写入A表。A表和B表均包含分区字段,但关联条件不涉及分区字段。由于数据量大、服务器配置高,直接关联的方案不可行。故需寻求其他解决方案。 **方案分析** 1. **方案一**:全表关联,一次SQL完成。但受限于服务器内存和CPU,执行缓慢且易造成内存溢出。 2. **方案二**:增加分区字段拆分任务,实现关联。但会导致笛卡尔积,处理历史数据时面临诸多问题。 3. **方案三**:利用外部计算和存储,如Hive、Spark、Flink等。其中,Spark SQL通过连接Doris实现关联写入,操作相对简便。 **实际问题与解决** 在实际应用中,使用`spark-sql`直接读取Doris进行关联并写回Doris。除DDL外,仅需一条SQL即可完成任务。 **遇到的问题及解决方法** 1. **DML语句中的分区过滤问题**: - 原本DML语句中使用分区字段进行过滤,但Doris 2.0版本中的API处理存在问题。 - 解决方法:调整DML语句,使用嵌套引号配合转义字符,或选择其他成熟稳定的分区过滤方式。 2. **Spark executor内存和并行度设置**: - 并行度设置需平衡网络、IO与内存使用,避免过大或过小。 - 内存设置过高导致系统资源耗尽,过低则易造成内存溢出。 - 解决方法:逐步调整并行度和内存设置,直至找到合适的平衡点。 **总结** 针对该项目需求,结合各方案优缺点及实际实施情况,采用`spark-sql`直接读取Doris进行关联并写回Doris的方式最为合适。经过一系列优化调整,最终实现了高效稳定的关联操作。

正文

背景

最近项目上有一个需求,需要将两张表(A表和B表)的数据进行关联并回写入其中一张表(A表),两张表都是分区表,但是关联条件不包括分区字段。

分析过程

方案一

最朴素的想法,直接关联执行,全表关联,一条SQL搞定全部逻辑。想法越简单,执行越困难。由于数据量大,服务器规模较小,尽管各台服务器内存和CPU配置都很高,关联会将数据读取到内存,内存根本放不下,而且集群配置了workload group,可使用内存更小了,方案一不可行。

方案二

可以在关联时增加分区字段对任务进行拆解,这样可以实现,但是会形成笛卡尔积,历史数据量巨大,分区较多(A表和B表都是1年),缺点也很明显。

  • 执行耗时长
  • 执行语句太多,操作不便
  • 如果按照单分区关联,A表的每一个分区将会扫描B表全表

经过评估上述方案二不可行。

方案三

通过外部计算和存储来实现,可选的有Hive、Spark、Flink。三种方案都是可行的,但是从操作复杂度来看使用spark-sql直接读取Doris进行关联并写回Doris,除DDL外,只需要一条SQL即可搞定。

详细过程

  1. 下载预编译的spark和spark-doris-connector
  1. 部署
    将上述安装文件上传至Hadoop集群的其中一台机器,放置到任意目录,比如/opt,请保证使用的用户可以向Yarn提交任务。
    解压缩spark-3.4.3-bin-hadoop3.tgz得到spark-3.4.3-bin-hadoop3目录
    将spark-doris-connector-3.4_2.12-1.3.2.jar放到spark-3.4.3-bin-hadoop3/jars/

  2. 启动spark-sql

bin/spark-sql --master yarn --num-executors 40 --executor-memory 7G --name Spark-SQL:Doris
  1. 创建映射表
-- 用于读取A表数据
CREATE
TEMPORARY VIEW spark_doris_a
USING doris
OPTIONS(
  "table.identifier"="mydb.table_a",
  "fenodes"="10.*.*.1:9030,10.*.*.2:9030,10.*.*.3:9030",
  "user"="root",
  "password"="$YOUR_DORIS_PASSWORD"
);
-- 用于读取B表数据
CREATE
TEMPORARY VIEW spark_doris_b
USING doris
OPTIONS(
  "table.identifier"="mydb.table_b",
  "fenodes"="10.*.*.1:9030,10.*.*.2:9030,10.*.*.3:9030",
  "user"="root",
  "password"="$YOUR_DORIS_PASSWORD"
);
-- 用于向A表部分列写入数据
CREATE
TEMPORARY VIEW spark_doris_a_sink
USING doris
OPTIONS(
  "table.identifier"="mydb.table_a",
  "fenodes"="10.*.*.1:9030,10.*.*.2:9030,10.*.*.3:9030",
  "user"="root",
  "password"="$YOUR_DORIS_PASSWORD",
  "sink.properties.partial_columns"="true",
  "sink.properties.column"="'column_a','column_b','column_c'"
);
  1. DML语句
set enable_unique_key_partial_update=true;
set enable_insert_strict=false;

insert into spark_doris_a_sink
(column_a,column_b,column_c)
select a.column_a,b.column_b,b.column_c from spark_doris_a a left join spark_doris_b b on a.colum_d = b.column_e;

遇到的问题

处理过程很简单,但是实际也是遇到了很多问题

  1. DML语句中不能使用分区字段进行过滤,因为Doris 2.0版本在提供的获取执行计划的API中对于引号的处理存在问题,如果传递的是"2024-06-12",则会得到数值2006,该数值无法转换为日期,如果传递"20240612"也无法得到Date("20240612")。理论上,如果调整此处的写法增加嵌套的引号配合转义字符也能实现功能,Java程序员都懂的,有兴趣可以自行验证。
  2. spark executor 内存和并行度设置,这个需要不断调整,我也是尝试了多次,才得到这个可以运行的结果。并行度设置太高了,将会对Doris形成较大的网络和IO冲击,一定要慎重。并行度低了,内存就要高一点,不然数据都已经从Doris读取出来了,关联的时候会内存溢出。
  3. 读取Doris的速率还是很快的,而且spark在读取doris前获取了执行计划,对数据进行了行和列的裁剪,不会将整表数据都读出来。

与使用spark-sql处理Doris大表关联相似的内容:

使用spark-sql处理Doris大表关联

背景 最近项目上有一个需求,需要将两张表(A表和B表)的数据进行关联并回写入其中一张表(A表),两张表都是分区表,但是关联条件不包括分区字段。 分析过程 方案一 最朴素的想法,直接关联执行,全表关联,一条SQL搞定全部逻辑。想法越简单,执行越困难。由于数据量大,服务器规模较小,尽管各台服务器内存和C

SQL窗口分析函数使用详解系列三之偏移量类窗口函数

1.综述 本文以HiveSQL语法进行代码演示。 对于其他数据库来说同样也适用,比如SparkSQL,FlinkSQL以及Mysql8,Oracle,SqlServer等传统的关系型数据库。 已更新第一类聚合函数类,点击这里阅读 ①SQL窗口函数系列一之聚合函数类 ②SQL窗口函数系列二之分组排序窗

基于Spark的大规模日志分析

摘要:本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。 本文分享自华为云社区《【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】》,作者:上进小菜猪。 随着互联网的普及和应用范围的扩大,越来越多的应用场景需要对海量数据进行

MapReduce和Spark读取HBase快照表

1.概述 随着大数据技术的不断发展,处理海量数据的需求变得愈发迫切。MapReduce作为一种分布式计算模型,为处理大规模数据提供了有效的解决方案。在这篇博客中,我们将探讨如何使用MapReduce框架读取快照表(Snapshot Table)的数据。快照表是一种记录某一时刻系统状态的表格,通过Ma

Kafka最佳实践

前言 Kafka 最佳实践,涉及 典型使用场景 Kafka 使用的最佳实践 Kafka 典型使用场景 Data Streaming Kafka 能够对接到 Spark、Flink、Flume 等多个主流的流数据处理技术。利用 Kafka 高吞吐量的特点,客户可以通过 Kafka 建立传输通道,把应用

Java中可以用的大数据推荐算法

在Java中实现大数据推荐算法时,通常会使用一些开源的机器学习库,如Apache Mahout、Weka、DL4J(DeepLearning4j,用于深度学习)或者Spark MLlib(用于在Spark集群上运行)。由于完整实现一个大数据推荐算法的代码量可能非常大,并且需要配合具体的数据集和环境进

10.4 认识Capstone反汇编引擎

Capstone 是一款开源的反汇编框架,目前该引擎支持的CPU架构包括x86、x64、ARM、MIPS、POWERPC、SPARC等,Capstone 的特点是快速、轻量级、易于使用,它可以良好地处理各种类型的指令,支持将指令转换成AT&T汇编语法或Intel汇编语法等多种格式。Capstone的...

使用Cloudflare Worker加速docker镜像

前言 开发者越来越难了,现在国内的docker镜像也都️了,没有镜像要使用docker太难了,代理又很慢 现在就只剩下自建镜像的办法了 GitHub上有开源项目可以快速搭建自己的镜像库,不过还是有点麻烦,还好Cloudflare暂时还活着‍ 本文记录一下使用 Cloudf

使用C#/.NET解析Wiki百科数据实现获取历史上的今天

创建一个webapi项目做测试使用。 创建新控制器,搭建一个基础框架,包括获取当天日期、wiki的请求地址等 创建一个Http请求帮助类以及方法,用于获取指定URL的信息 使用http请求访问指定url,先运行一下,看看返回的内容。内容如图右边所示,实际上是一个Json数据。我们主要解析 大事记 部

Pybind11和CMake构建python扩展模块环境搭建

使用pybind11的CMake模板来创建拓展环境搭建 从Github上下载cmake_example的模板,切换分支,并升级pybind11子模块到最新版本 拉取pybind11使用cmake构建工具的模板仓库 git clone --recursive https://github.com/mr