大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property
数据,业务,采集,flinkcdc,debeziumsourcefunction,via,the,servertimezone,configuration,property
·
浏览次数 : 221
小编点评
## Summary of the generated content:
This code demonstrates how to configure Flink CDC to read data from a MySQL database using the MySQLSource connector.
**Causes the error:**
The error originates from the server time zone value configured in the MySQLSource. The value `'�й���ʱ��'` is recognized as an unrecognized or multiple time zone value by the connector.
**Solution:**
The solution is to configure the server or JDBC driver (via the `serverTimezone` configuration property) to use a more specific and relevant time zone value. This allows the connector to interpret the time zone correctly.
**Additional details:**
* The code defines a `DebeziumSourceFunction` instance for reading data from the MySQL database.
* The `MySQLSource` constructor specifies the server address, port, username, and password.
* It also sets the `serverTimezone` property to `GMT+8` to indicate the target time zone.
* The `startupOptions` parameter contains additional configuration options for the connector.
* The `streamSource` is created from the `sourceFunction` and configured to read data from the MySQL database.
* The code then prints the data stream and starts the Flink CDC job.
正文
Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: The server time zone value '�й���ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the 'serverTimezone' configuration property) to use a more specifc time zone value if you want to utilize time zone support.
at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:342)
at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:321)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:79)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:350)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
package com.atguigu;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.serverTimeZone("GMT+8") //时区报错增加这个设置
.port(3306)
.username("root")
.password("110")
.databaseList("springboot")
.tableList("springboot.sys_user") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.打印数据
streamSource.print();
//4.启动任务
env.execute("FlinkCDC");
}
}
与大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property相似的内容: