大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property

数据,业务,采集,flinkcdc,debeziumsourcefunction,via,the,servertimezone,configuration,property · 浏览次数 : 221

小编点评

## Summary of the generated content: This code demonstrates how to configure Flink CDC to read data from a MySQL database using the MySQLSource connector. **Causes the error:** The error originates from the server time zone value configured in the MySQLSource. The value `'�й���׼ʱ��'` is recognized as an unrecognized or multiple time zone value by the connector. **Solution:** The solution is to configure the server or JDBC driver (via the `serverTimezone` configuration property) to use a more specific and relevant time zone value. This allows the connector to interpret the time zone correctly. **Additional details:** * The code defines a `DebeziumSourceFunction` instance for reading data from the MySQL database. * The `MySQLSource` constructor specifies the server address, port, username, and password. * It also sets the `serverTimezone` property to `GMT+8` to indicate the target time zone. * The `startupOptions` parameter contains additional configuration options for the connector. * The `streamSource` is created from the `sourceFunction` and configured to read data from the MySQL database. * The code then prints the data stream and starts the Flink CDC job.

正文

Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the 'serverTimezone' configuration property) to use a more specifc time zone value if you want to utilize time zone support.
	at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:342)
	at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:321)
	at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:79)
	at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
	at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:350)
	at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
	at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
package com.atguigu;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; 
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .serverTimeZone("GMT+8")  //时区报错增加这个设置
                .port(3306)
                .username("root")
                .password("110")
                .databaseList("springboot")
                .tableList("springboot.sys_user")   //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        //3.打印数据
        streamSource.print();

        //4.启动任务
        env.execute("FlinkCDC");

    }

}

与大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property相似的内容:

大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property

Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: The server time zone value '�й���׼ʱ��' is unrecognized or

大数据-业务数据采集-FlinkCDC

CDC CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 的种类 CDC 主要分为基于查询和基于 Binl

大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format

Caused by: org.apache.kafka.connect.errors.ConnectException: The MySQL server is not configured to use a ROW binlog_format, which is required for this

大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

| 目录 | 作用 | | | | | app | 产生各层数据的 flink 任务 | | bean | 数据对象 | | common | 公共常量 | | utils | 工具类 | app.ods.FlinkCDC.java package com.atguigu.app.ods; impo

大数据 - DWD&DIM 业务数据

业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事

如何在填报场景中使用数据绑定获取数据源

背景 在公司的日常业务中,存在不少数据的收集提取需求,大部分公司会采取Excel来完成数据的收集和汇总,但这项工作会让负责信息收集的业务人员相当头大。虽然提前做好了数据收集模板,但最终提交上来的模板会被修改的五花八门,信息填写错误率比较高,无法实现信息填写不完整不允许提交的约束。后期的数据汇总虽然可

大数据 - DWM层 业务实现

DWM 建表,需要看 DWS 需求。 DWS 来自维度(访客、商品、地区、关键词),为了出最终的指标 ADS 需求指标 DWT 为什么实时数仓没有DWT,因为它是历史的聚集,累积结果,实时数仓中不需要 DWD 不需要加工 DWM 需要加工的数据 统计主题 需求指标【ADS】输出方式计算来源来源层级

大数据 - DWS层 业务实现

统计主题 需求指标【ADS】输出方式计算来源来源层级 访客【DWS】pv可视化大屏page_log 直接可求dwd UV(DAU)可视化大屏需要用 page_log 过滤去重dwm UJ 跳出率可视化大屏需要通过 page_log 行为判断dwm 进入页面数可视化大屏需要识别开始访问标识dwd 连续

[转帖]Oracle创建用户和表空间

一、概述 1.数据库实际管理中,不同业务系统需要使用’不同的用户'进行管理维护和使用,这样做把业务数据和系统数据独立分开管理,利于数据库系统管理; 2.在数据库中创建业务系统用户时候,建议为用户创建指定的用户表空间,否则全部默认为user表空间存储,使得user表空间容易过大,不易管理、查询。 二、

[转帖]TiDB + TiFlash : 朝着真 HTAP 平台演进

https://zhuanlan.zhihu.com/p/80495479 作者介绍:韦万,PingCAP 数据库研发工程师,主要领域是数据库的存储引擎研发,以及系统性能优化。 一、为什么我们需要 HTAP 数据库? 在互联网浪潮出现之前,企业的数据量普遍不大,特别是核心的业务数据,通常一个单机的数