异构数据源同步之数据同步 → datax 改造,有点意思

datax · 浏览次数 : 0

小编点评

**重点来了:** 目前主流的 Linux 系统,都自带 Python !(也就是不用再额外的是安装 Python,直接可以用。) **部署在 Windows 上的步骤:** 1. 将 DataX 的安装包下载下来。 2. 双击安装包,按照屏幕上的说明完成安装。 3. 启动 DataX。 **使用 java 启动 DataX 的步骤:** 1. 打开命令行。 2. 输入 `java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\\\\datax-tool\\\\datax\\\\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\\\\datax-tool\\\\datax -Dlogback.configurationFile=G:\\\\datax-tool\\\\datax\\\\conf\\\\logback.xml -classpath G:\\\\datax-tool\\\\datax\\\\lib\\\\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\\\\datax-tool\\\\datax\\\\job\\\\job.json` 命令。 3. 按回车键。 **注意:** 1. `DATAX_COMMAND` 中包含了用于启动 DataX 的命令。 2. `SYSTEM_ENCODING` 用于设置输入和输出的编码。 3. `DataX + datax-web` 是一个用于大数据量的调度框架。

正文

开心一刻

去年在抖音里谈了个少妇,骗了我 9 万

后来我发现了,她怕我报警

她把她表妹介绍给我

然后她表妹又骗了我 7 万

DataX

DataX 是什么,有什么用,怎么用

不做介绍,大家自行去官网(DataX)看,Gitee 上也有(DataX

不服气吗

你们别不服,我这是为了逼迫你们去自学,是为了你们好!

文档很详细,也是开源的,我相信你们都能看懂,也能很快上手用起来

那这篇文章到此结束,大家各自去忙吧

但是等等,我想带你们去改造改造datax

挺有意思的,我们慢慢往下看

去 Python

根据官方的 Quick Start

系统要求

是依赖 Python 来启动的

$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

如果要去掉 Python 依赖,你们会怎么做?

是不是梳理清楚 datax.py 的代码逻辑就行了?

datax.py

这个代码不长,但是如果没有一点 Python 底子,datax.py 是看不懂的

所以我们换个方式,去寻找我们需要的信息就行了

DataX 的业务代码是 java 实现的,然后你们再往上看看 System Requirements

你们觉得该如何启动 JVM 进程来执行 DataXjava 代码?

是不是只能用 JDKjava 命令了?

所以我们直接在 datax.py 中搜索 java 即可

你们会发现只有如下这一行表示 java 命令

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)

Python 中的 % 就相当于 java 中的 String.format 方法

python %

也就说,datax.py 是通过 java -server 命令来启动 JVM 进程的

那么我们是不是可以绕过 Python,直接在 cmd 调用 java -server 来启动了?

java -server

这个命令还真不眼熟,因为我们接触到的往往是 java -jar

我们用 java -h 看下 java 命令的说明

java -h

发现了什么?

-serveroption 之一,与 -jar 并不是 非此即彼 的关系

所以不要去拿 java -serverjava -jar 做对比了,没意义!!!

在Java中,JVM有两种运行模式:客户端模式和服务器模式。这两种模式是为了优化不同场景下的JVM性能而设计的。

服务器模式:这种模式适用于长时间运行的应用程序,如Web服务器或数据库服务器。服务器模式下的JVM会进行更多的优化,以减少长时间运行的性能开销。例如,它会进行更深入的即时编译(JIT compilation),以提高代码的执行效率。

客户端模式:默认情况下,JVM运行在客户端模式。这种模式适用于较短时间运行的应用程序,如桌面应用或命令行工具。客户端模式下的JVM会更快地启动,但可能不如服务器模式那样高效。

使用-server选项启动JVM时,您告诉JVM在服务器模式下运行。这通常意味着JVM将使用更多的系统资源,但可以提供更好的性能,特别是在长时间运行的应用程序中

我们先下载 DataX 工具包

datax工具包

解压之后,我的 DataX 的根目录是:G:\datax-tool\datax

datax工具home目录

我们不通过 datax.py 来启动,而是直接在 cmd 下通过 java 命令来启动

java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\datax-tool\datax\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\datax-tool\datax -Dlogback.configurationFile=G:\datax-tool\datax\conf\logback.xml -classpath G:\datax-tool\datax\lib\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\datax-tool\datax\job\job.json

注意:上述 java 命令中的相关路径需要替换成你们自己的路径!

不出意外的话,会执行成功

java命令启动

为什么依赖 Python

如果你们去看了 DataX 工具包的目录结构,或者 DataX 的源码

你们会发现 DataX 就是用 java 实现的,Python 仅仅只是作为一个启动脚本(另外两个脚本你们自己去研究)

启动脚本

仅仅为了一个启动,而这个启动又不是非 Python 不可,就引入了 Python 环境依赖,试问这合理吗?

不合理

不要急着下结论,我们理智分析一波

DataX 正式投入使用的时候,会部署到什么系统上,请你们大声的告诉我

linux

不说全部,绝大部分是部署在 Linux 上,对此我相信你们都没异议吧

那么重点来了:目前主流的 Linux 系统,都自带 Python !!!

也就是不用再额外的是安装 Python,直接可以用,那为什么不用呢?

那如果是部署在 Windows 上,而又不想安装 Python,该如何启动了?

如果你们还能问出这样的问题,我只想给你们来上一枪

老子一枪崩了你

前面不是刚讲吗,在 cmd 直接用 java 命令来启动 DataX 不就行了?

java 启动 DataX

说的更详细点,是通过 java 代码去启动 DataX JVM 进程

我相信你们都会,直接上代码

private static final String DATAX_COMMAND = "java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\\datax-tool\\datax\\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\\datax-tool\\datax -Dlogback.configurationFile=G:\\datax-tool\\datax\\conf\\logback.xml -classpath G:\\datax-tool\\datax\\lib\\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\\datax-tool\\datax\\job\\job.json";

public static void main(String[] args) {
	try {
		Process process = Runtime.getRuntime().exec(DATAX_COMMAND);
		// 等待命令执行完成
		int i = process.waitFor();
		if (i == 0) {
			System.out.println("job执行完成");
		} else {
			System.out.println("job执行失败");
		}
	} catch (Exception e) {
		throw new RuntimeException(e);
	}
}

是不是很简单?

执行下,你会发现卡住了!!!

卡住

出师不利呀,要不放弃?

放弃

Runtime 对象调用 exec(cmd) 后,JVM 会启动一个子进程,该进程会与 JVM 进程建立三个管道连接:标准输入标准输出标准错误流

假设子进程不断在向标准输出流和标准错误流写数据,而 JVM 进程不读取的话,当缓冲区满之后将无法继续写入数据,最终造成阻塞在 waitfor()

所以改造下就好了

private static final String SYSTEM_ENCODING = System.getProperty("sun.jnu.encoding");
private static final String DATAX_COMMAND = "java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\\datax-tool\\datax\\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\\datax-tool\\datax -Dlogback.configurationFile=G:\\datax-tool\\datax\\conf\\logback.xml -classpath G:\\datax-tool\\datax\\lib\\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\\datax-tool\\datax\\job\\job.json";

public static void main(String[] args) {
	try {
		Process process = Runtime.getRuntime().exec(DATAX_COMMAND);

		// 另启线程读取
		new Thread(() -> {
			try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) {
				String line;
				while ((line = reader.readLine()) != null) {
					System.out.println(line);
				}
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
		}).start();

		new Thread(() -> {
			try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream(), SYSTEM_ENCODING))) {
				String line;
				while ((line = errorReader.readLine()) != null) {
					System.out.println(line);
				}
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
		}).start();

		// 等待命令执行完成
		int i = process.waitFor();
		if (i == 0) {
			System.out.println("job执行完成");
		} else {
			System.out.println("job执行失败");
		}
	} catch (Exception e) {
		throw new RuntimeException(e);
	}
}

还是比较简单的吧,相信你们都能看懂

总结

  • DataX 是进程级别的,而 Job 下的 Task 是线程级别的

    image-20240519201433065

    为什么 DataX 要实现成进程级别,而不是线程级别?

    小数据量的同步,实现方式往往很多

    但大数据量的同步,情况就不一样了,那么此时进程和线程的区别还大吗

  • Linux 系统基本自带 Python 环境,所以大家不要再纠结为什么依赖 Python

    去掉 Python 依赖也很简单,文中已有演示

  • DataX + datax-web 这个组合已经基本够用

    datax-web 基于 XXL-JOB,基本满足我们日常的调度要求了

与异构数据源同步之数据同步 → datax 改造,有点意思相似的内容:

异构数据源同步之数据同步 → datax 改造,有点意思

开心一刻 去年在抖音里谈了个少妇,骗了我 9 万 后来我发现了,她怕我报警 她把她表妹介绍给我 然后她表妹又骗了我 7 万 DataX DataX 是什么,有什么用,怎么用 不做介绍,大家自行去官网(DataX)看,Gitee 上也有(DataX) 你们别不服,我这是为了逼迫你们去自学,是为了你们好

异构数据源同步之数据同步 → datax 再改造,开始触及源码

开心一刻 其实追女生,没那么复杂 只要你花心思,花时间,陪她聊天,带她吃好吃的,耍好玩的,买好看的 慢慢你就会发现什么叫做 打水漂 不说了,我要去陪她看电影了 前情回顾 异构数据源同步之数据同步 → datax 改造,有点意思 主要讲到了2点 去 Python,直接在命令行用 java 命令来启动

异构数据源同步之数据同步 → DataX 使用细节

开心一刻 中午我妈微信给我消息 妈:儿子啊,妈电话欠费了,能帮妈充个话费吗 我:妈,我知道了,我帮你充 当我帮我妈把话费充好,正准备回微信的时候,我妈微信给我发消息了 妈:等会儿子,不用充了,刚刚有个二臂帮妈充上了 我输入框中的(妈,充好了)是发还是不发? 简单使用 关于 DataX ,大家可以去看

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

开心一刻 出门扔垃圾,看到一大爷摔地上了 过去问大爷:我账户余额 0.8,能扶你起来不 大爷往旁边挪了挪 跟我说到:孩子,快,你也躺下,这个来钱快! 我没理大爷,径直去扔了垃圾 然后飞速的躺在了大爷旁边,说道:感谢大爷带飞! 书接上回 通过 异构数据源同步之数据同步 → DataX 使用细节,相信大

Python学习之二:不同数据库相同表是否相同的比较方法

摘要 昨天学习了使用python进行数据库主键异常的查看. 当时想我们有跨数据库的数据同步场景. 对应的我可以对不同数据库的相同表的核心字段进行对比. 这样的话能够极大的提高工作效率. 我之前写过很长时间的shell.昨天跟着同事开始学python. 感觉的确用python能够节约大量的时间. 生活

(三)Redis 线程与IO模型

1、Redis 单线程 通常说 Redis 是单线程,主要是指 Redis 的网络 IO 和键值对读写是由一个线程来完成的,其他功能,比如持久化、异步删除、集群数据同步等,是由额外的线程执行的,所以严格来说,Redis 并不是单线程。 多线程开发会不可避免的带来并发控制和资源开销的问题,如果没有良好

[转帖]详解MySQL数据库原生的数据复制方式:异步复制、半同步复制与全同步复制

一、MYSQL复制架构衍生史 在2000年,MySQL 3.23.15版本引入了Replication。Replication作为一种准实时同步方式,得到广泛应用。这个时候的Replicaton的实现涉及到两个线程,一个在Master,一个在Slave。Slave的I/O和SQL功能是作为一个线程,

跨机房ES同步实战

作者:谢泽华 背景 众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用

[转帖]5分钟学会这种更高效的Redis数据删除方式

https://ost.51cto.com/posts/12513 简述 我们知道,Del命令能删除数据,除此之外,数据在Redis中,还会以哪种方式被删除呢?在Redis内存满一定会返回OOM错误?Key到达过期时间就立即删除?删除大Key会影响性能吗?下面,咱们一起探讨。 同步和异步删除 1.D

(五)Redis 缓存异常、应对策略

1、缓存和数据库不一致 只要我们使用 Redis 缓存,就必然会面对缓存和数据库间的一致性保证问题,这里的“一致性”包含了两种情况:缓存中有数据且与数据库中的值相同、缓存中没有数据,最新值在数据库中。 对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略,在业务应用中使用事务机制