大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

数据,业务,采集,flinkcdc,读取,mysql,存入,kafka · 浏览次数 : 386

小编点评

**作用 app 生成各层数据的 Flink 任务bean 数据对象 common公共常量 utils工具类 app.ods.FlinkCDC.javapackage** 该类提供以下方法: * **`CustomerDeserialization`** 实现了 `DebeziumDeserializationSchema` 接口,用于解析数据流中的 `Customer` 对象。 **主要方法:** 1. **`main` 方法:** - 获取执行环境。 - 创建 Flink Kafka 连接器。 - 创建 Flink Kafka 生产者和消费者。 - 配置生产者和消费者。 - 获取 Kafka brokers、默认主题和消费者组 ID。 - 创建 DDL 脚本。 - 创建 DDL 字符串。 - 设置 DDL 属性,例如连接类型和扫描模式。 - 创建 DDL 字符串。 - 启动 Flink 数据流。 2. **`getKafkaDDL()`** 方法用于生成用于 DDL 的字符串。它使用 `KafkaSerializationSchema` 创建一个生产者配置对象,并使用它创建一个 DDL 字符串。 **其他注释:** * 该类使用 `SimpleStringSchema` 来解析 `Customer` 对象。 * 它使用 `kafka.clients.producer` 和 `kafka.clients.consumer` 客户端库来创建 Flink Kafka 生产者和消费者。 * 它使用 `KafkaSerializationSchema` 来序列化和反序列化 `Customer` 对象。

正文

image

目录 作用
app 产生各层数据的 flink 任务
bean 数据对象
common 公共常量
utils 工具类

app.ods.FlinkCDC.java

package com.atguigu.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 设置CK&状态后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-210325-flink")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        //3.打印数据并将数据写入Kafka
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //4.启动任务
        env.execute("FlinkCDC");
    }

}

CustomerDeserialization

package com.atguigu.app.function;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {

    /**
     * 封装的数据格式
     * {
     * "database":"",
     * "tableName":"",
     * "before":{"id":"","tm_name":""....},
     * "after":{"id":"","tm_name":""....},
     * "type":"c u d",
     * //"ts":156456135615
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.创建JSON对象用于存储最终数据
        JSONObject result = new JSONObject();

        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);

        //7.输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

MyKafkaUtil

package com.atguigu.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.util.Properties;

public class MyKafkaUtil {

    private static String brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
    private static String default_topic = "DWD_DEFAULT_TOPIC";

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }

    public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema) {

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaProducer<T>(default_topic,
                kafkaSerializationSchema,
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {

        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaConsumer<String>(topic,
                new SimpleStringSchema(),
                properties);

    }

    //拼接Kafka相关属性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return  " 'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + brokers + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'latest-offset'  ";
    }

}

尚硅谷 源代码
https://gitee.com/wh-alex/gmall-flink-210325

与大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka相似的内容:

大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

| 目录 | 作用 | | | | | app | 产生各层数据的 flink 任务 | | bean | 数据对象 | | common | 公共常量 | | utils | 工具类 | app.ods.FlinkCDC.java package com.atguigu.app.ods; impo

大数据-业务数据采集-FlinkCDC

CDC CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 的种类 CDC 主要分为基于查询和基于 Binl

大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property

Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: The server time zone value '�й���׼ʱ��' is unrecognized or

大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format

Caused by: org.apache.kafka.connect.errors.ConnectException: The MySQL server is not configured to use a ROW binlog_format, which is required for this

大数据 - DWD&DIM 业务数据

业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事

如何在填报场景中使用数据绑定获取数据源

背景 在公司的日常业务中,存在不少数据的收集提取需求,大部分公司会采取Excel来完成数据的收集和汇总,但这项工作会让负责信息收集的业务人员相当头大。虽然提前做好了数据收集模板,但最终提交上来的模板会被修改的五花八门,信息填写错误率比较高,无法实现信息填写不完整不允许提交的约束。后期的数据汇总虽然可

大数据 - DWM层 业务实现

DWM 建表,需要看 DWS 需求。 DWS 来自维度(访客、商品、地区、关键词),为了出最终的指标 ADS 需求指标 DWT 为什么实时数仓没有DWT,因为它是历史的聚集,累积结果,实时数仓中不需要 DWD 不需要加工 DWM 需要加工的数据 统计主题 需求指标【ADS】输出方式计算来源来源层级

大数据 - DWS层 业务实现

统计主题 需求指标【ADS】输出方式计算来源来源层级 访客【DWS】pv可视化大屏page_log 直接可求dwd UV(DAU)可视化大屏需要用 page_log 过滤去重dwm UJ 跳出率可视化大屏需要通过 page_log 行为判断dwm 进入页面数可视化大屏需要识别开始访问标识dwd 连续

[转帖]Oracle创建用户和表空间

一、概述 1.数据库实际管理中,不同业务系统需要使用’不同的用户'进行管理维护和使用,这样做把业务数据和系统数据独立分开管理,利于数据库系统管理; 2.在数据库中创建业务系统用户时候,建议为用户创建指定的用户表空间,否则全部默认为user表空间存储,使得user表空间容易过大,不易管理、查询。 二、

[转帖]TiDB + TiFlash : 朝着真 HTAP 平台演进

https://zhuanlan.zhihu.com/p/80495479 作者介绍:韦万,PingCAP 数据库研发工程师,主要领域是数据库的存储引擎研发,以及系统性能优化。 一、为什么我们需要 HTAP 数据库? 在互联网浪潮出现之前,企业的数据量普遍不大,特别是核心的业务数据,通常一个单机的数