阿里DataX极简教程

datax · 浏览次数 : 0

小编点评

**Job Details** **Name:** mysql-to-clickhouse.json **Description:** - Reads two fields (id and name) from a MySQL database table. - Synchronizes the data to a Clickhouse database. - Creates the Clickhouse database and table if they don't exist. **Inputs:** - mysql-to-clickhouse.json (Configuration file) **Outputs:** - No direct outputs. **Execution:** 1. Starts a datax process to connect to the MySQL database. 2. Executes a command to synchronize data from MySQL to Clickhouse. 3. Creates the Clickhouse database and table if they don't exist. **Performance:** - 140W data is synchronized in 18 seconds. - Write speed: 2.21MB/s. - Read count: 1424252. **Dependencies:** - DataX MySQL connector - Clickhouse Python client **Additional Information:** - The configuration file (mysql-to-clickhouse.json) should be located in the same directory as the job script. - The `readme` file contains installation instructions and usage examples.

正文

简介

DataX是一个数据同步工具,可以将数据从一个地方读取出来并以极快的速度写入另外一个地方。常见的如将mysql中的数据同步到另外一个mysql中,或者另外一个mongodb中。

工作流程

  • read:设置一个源,DataX从源读取数据
  • write:设置一个目的地,DataX将读取到的数据写入目的地
  • setting:同步设置,如设置并发通道、控制作业速度等
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题
  • 多线程:充分利用多线程来处理同步任务

核心架构

核心模块介绍

1:DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2:DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3:切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5

4:每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作

5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  • DaXJob根据分库分表切分成了100个Task。

  • 根据20个并发,DataX计算共需要分配4个TaskGroup。

  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

支持的数据

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
达梦
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
MongoDB
Hive
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch

实践

作为极简教程,本文将从mysql中读取一张表的数据,然后同步到clickhouse中。

下载

打开该项目的Github 首页进行下载:https://github.com/alibaba/DataX

下载链接:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz

下载下来是一个tar.gz的包,windows下解压命令:


tar  -zxvf  xxx.tar.gz

程序目录:

  • bin:使用里面的 datax.py 来启动程序
  • job:里面放了一个job.json,用来检查运行环境,一般的建议下载完毕之后执行一次。
  • log:存放执行日志
  • plugin:插件集,插件分为read和write,分别对应datax可支持的数据库
  • 其他目录:......

环境

DataX是基于python和java的,需要机器拥有python和java 的运行环境。

在下载完毕后,通过执行自检脚本,可确认环境是否正确



 python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

执行流程

编写同步任务配置文件,在job目录中创建 mysql-to-clickhouse.json 文件,并填入如下内容



{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxx",
                        "password": "xxx",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "table_name"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.xxx:xxx/db_name"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "clickhousewriter",
                    "parameter": {
                        "username": "xxx",
                        "password": "xxx",
                        "column": [
                            "id",
                            "ame"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:clickhouse://192.168.1.xxx:xxx/table_name",
                                "table": [
                                    "table_name"
                                ]
                            }
                        ],
                        "preSql": [],
                        "postSql": [],
                        "batchSize": 65536,
                        "batchByteSize": 134217728,
                        "dryRun": false,
                        "writeMode": "insert"
                    }
                }
            }
        ]
    }
}



  • job:一个job包含两个部分,setting中设置任务的执行速度,错误限制等,content中是任务具体的描述。
  • reader:任务的数据输入源
  • writer:任务的数据输出源

根据任务配置文件启动datax,先cd到datax的根目录


python bin/datax.py    job/mysql-to-clickhouse.json


运行上述命令后,任务就开启了。本例从mysql数据库中的一张表中读取了两个字段(id,name),然后同步到clickhouse中,clickhouse中需要先创建同样的库,表和列。

任务执行非常快,140W数据仅用了 18s 就完成了同步。



2024-05-16 16:24:57.312 [job-0] INFO  JobContainer -
任务启动时刻                    : 2024-05-16 16:24:38
任务结束时刻                    : 2024-05-16 16:24:57
任务总计耗时                    :                 18s
任务平均流量                    :            2.21MB/s
记录写入速度                    :         142425rec/s
读出记录总数                    :             1424252
读写失败总数                    :                   0


引用

与阿里DataX极简教程相似的内容:

阿里DataX极简教程

目录简介工作流程核心架构核心模块介绍DataX调度流程支持的数据实践下载环境执行流程引用 简介 DataX是一个数据同步工具,可以将数据从一个地方读取出来并以极快的速度写入另外一个地方。常见的如将mysql中的数据同步到另外一个mysql中,或者另外一个mongodb中。 工作流程 read:设置一

聊聊Flink CDC必知必会

CDC是(Change Data Capture变更数据获取)的简称。 核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 ## Flink CDC的设

聊聊HuggingFace如何处理大模型下海量数据集

翻译自: [Big data? 🤗 Datasets to the rescue!](https://huggingface.co/learn/nlp-course/chapter5/4?fw=pt#big-data-datasets-to-the-rescue "Big data? 🤗 Dat

聊聊Flink必知必会(四)

### 概述 Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。 Flink明确支持以下3个不同的时间概念。 (1)事件时间:事件发生的时间,由产生(或存储)事件的设备

阿里也出手了!Spring CloudAlibaba AI问世了

写在前面 在之前的文章中我们有介绍过SpringAI这个项目。SpringAI 是Spring 官方社区项目,旨在简化 Java AI 应用程序开发, 让 Java 开发者想使用 Spring 开发普通应用一样开发 AI 应用。 而SpringAI 主要面向的是国外的各种大模型接入,对于国内开发者可

阿里400+天,我为什么离开阿里

阿里还是挺不错了,感谢公司,感谢同事们! 零丶前言 今天是我在阿里的lastday,明天我将回成都(此处嘴角弯,我爱成都),端午后入职另外一家互联网大厂。 在去年3月份的时候,我从成都的某家金融科技银行跳槽到杭州阿里巴巴淘天集团,这篇《跳槽!阿里工作100+天,菜鸡职业生涯的一点记录》记录了我跳槽动

阿里140逆向纯与补

声明 本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除! 目标网站 aHR0cHM6Ly93d3cuanVtaW5nLmNvbS8= 分析逆向流程 今天我们看看ali的的n参数为140

阿里云入选Gartner「边缘分发平台市场指南」代表厂商

近日,全球技术研究与咨询机构Gartner首次发布边缘分发平台市场指南报告《Market Guide for Edge Distribution Platforms》,阿里云凭借内容分发网络CDN、全站加速DCDN、边缘节点服务ENS等产品获得Gartner认可,成功入选边缘分发平台代表厂商。 Ga

阿里云开发者社区有奖征文活动,期待您出文相助

和阿里云开发者社区的合作曾经是园子的收入来源之一,但现在合作机会越来越少了,今年好不容易等到一次合作机会,就是这次的有奖征文活动 ——「寻找热爱技术创作的你:写下你在技术探中的实践和思考」,详见活动公告 这次征文合作分2期,第1期需要完成保底提交50篇符合要求的文章才能拿到收入,第2期按实际提交的文

阿里面试:NIO为什么会导致CPU100%?

在 Java 中总共有三种 IO 类型:BIO(Blocking I/O,阻塞I/O)、NIO(Non-blocking I/O,非阻塞I/O)和 AIO(Asynchronous I/O,异步I/O),它们的区别如下: 在 JDK 1.4 之前,只有 BIO 一种模式,其开发过程相对简单,新来一个