聊聊Flink必知必会(四)

聊聊,flink · 浏览次数 : 39

小编点评

**Flink Streaming API中的时间概念** Flink streaming API支持以下3个不同的时间概念: 1. **事件时间**:事件发生的时间,由产生(或存储)事件的设备记录。 2. **接入时间**:Flink在接入事件时记录的时间戳。 3. **处理时间**:管道中特定操作符处理事件的时间。 **水印 (Watermark)** 是一个用于指定事件时间进度的特殊事件,它用于确定数据流中哪些元素已经到达并哪些元素还需要处理。 **水印的生成过程**: 1. **静态辅助方法** 提供了用于创建 WatermarkGenerator 的方法。 2. **动态方法** 在源函数中创建 WatermarkGenerator。 ** WatermarkStrategy 接口** 提供了在事件时间点生成和发射 Watermark 的各种策略。 **主要方法**: * **forMonotonousTimestamps()**:用于生成以事件时间为基础的 Watermark。 * **forBoundedOutOfOrderness()**:用于生成以时间窗口为基础的 Watermark。 * **forGenerator()**:用于生成以 generatorSupplier 为基础的 Watermark。

正文

概述

Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。

Flink明确支持以下3个不同的时间概念。
(1)事件时间:事件发生的时间,由产生(或存储)事件的设备记录。

(2)接入时间:Flink在接入事件时记录的时间戳。

(3)处理时间:管道中特定操作符处理事件的时间。

image

支持事件时间的流处理器需要一种方法来度量事件时间的进度。在Flink中测量事件时间进展的机制是水印(watermark)。水印是一种特殊类型的事件,是告诉系统事件时间进度的一种方式。水印流是数据流的一部分,并带有时间戳t。水印(t)声明事件时间已经到达该流中的时间t,这意味着时间戳t′≤t(时间戳更早或等于水印的事件)的流中不应该有更多的元素。

Flink中水印的处理

水印的时间戳

Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,在事件时间窗口场景下,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

时间t的水印标记了数据流中的一个位置,并断言此时的流在时间t之前已经完成。为了执行基于事件时间的事件处理,Flink需要知道与每个事件相关联的时间,它还需要包含水印的流。水印就是系统事件时间的时钟。水印触发是基于事件时间的计时器的触发。

事件流的类型有两种,一个是顺序的,一个是无序的。先看顺序场景下,水印的排列。

image

对于无序流,水印是至关重要的,其中事件不是按照它们的时间戳排序的。

例如,当操作符接收到w(11)这条水印时,可以认为时间戳小于或等于11的数据已经到达,此时可以触发计算。同样,当接收到w(17)这条水印时,可以认为时间戳小于或等于17的数据已经到达,此时可以触发计算。

image

可以看出,水印的时间戳是单调递增的,时间戳为t的水印意味着所有后续记录的时间戳将大于t。一般来讲,水印是一种声明,在流中的那个点之前,即在某个时间戳之前的所有事件都应该已经到达。

水印是在源函数处或直接在源函数之后生成的。源函数的每个并行子任务通常可以独立地生成水印。这些水印定义了特定并行源处的事件时间。

水印的生成

Flink提供了用于处理事件时间、时间戳和水印的API。

为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过TimestampAssigner从元素中的某个字段访问/提取时间戳实现的。

产生水位线有以下两种方式:

● 直接由source算子产生;

● 通过assignTimestampsAndWatermarks()方法指定TimestampAssigner提取时间戳产生。

第一种产生水位线的方式要重写SourceFunction的run()方法,通过SourceContext对象发送水位:

void run(SourceContext<T> ctx) throws Exception;

对于第二种产生水位线的方式是在调用window()等方法前,调用assignTimestampsAndWatermarks()方法添加一个算子专门用于提取时间戳。

1.使用WatermarkStrategy上的静态辅助方法实现公共水印策略:

image

2.实现WatermarkStrategy接口,自定义TimestampAssigner与WatermarkGenerator捆绑在一起:


@Public
public interface WatermarkStrategy<T>
        extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

    @Override
    default TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        return new RecordTimestampAssigner<>();
    }

    @Experimental
    default WatermarkAlignmentParams getAlignmentParameters() {
        return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
    }

    default WatermarkStrategy<T> withTimestampAssigner(
            TimestampAssignerSupplier<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
    }

    default WatermarkStrategy<T> withTimestampAssigner(
            SerializableTimestampAssigner<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(
                this, TimestampAssignerSupplier.of(timestampAssigner));
    }

    default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
    }

    @Experimental
    default WatermarkStrategy<T> withWatermarkAlignment(
            String watermarkGroup, Duration maxAllowedWatermarkDrift) {
        return withWatermarkAlignment(
                watermarkGroup,
                maxAllowedWatermarkDrift,
                WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
    }

    @Experimental
    default WatermarkStrategy<T> withWatermarkAlignment(
            String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
        return new WatermarksWithWatermarkAlignment<T>(
                this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
    }

    static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

    static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }

    static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
        return generatorSupplier::createWatermarkGenerator;
    }

    static <T> WatermarkStrategy<T> noWatermarks() {
        return (ctx) -> new NoWatermarksGenerator<>();
    }
}

这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是createWatermarkGenerator方法。

所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个 WatermarkGenerator。

@Public
public interface WatermarkGenerator<T> {

	/**
	 * Called for every event, allows the watermark generator to examine and remember the
	 * event timestamps, or to emit a watermark based on the event itself.
	 */
	void onEvent(T event, long eventTimestamp, WatermarkOutput output);

	/**
	 * Called periodically, and might emit a new watermark, or not.
	 *
	 * <p>The interval in which this method is called and Watermarks are generated
	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
	 */
	void onPeriodicEmit(WatermarkOutput output);
}

这个方法简单明了,主要是有两个方法:

  • onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.

  • onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:env.getConfig().setAutoWatermarkInterval(5000L)

参考:
Flink时间语义与WaterMark详解

Flink Watermark 机制及总结

Flink之watermark(水印)讲解

与聊聊Flink必知必会(四)相似的内容:

聊聊Flink必知必会(四)

### 概述 Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。 Flink明确支持以下3个不同的时间概念。 (1)事件时间:事件发生的时间,由产生(或存储)事件的设备

聊聊Flink必知必会(二)

### Checkpoint与Barrier Flink是一个有状态的流处理框架,因此需要对状态做持久化,Flink定期保存状态数据到存储空间上,故障发生后从之前的备份中恢复,这个过程被称为Checkpoint机制。而Checkpoint为Flink提供了Exactly-Once的投递保障。 流处理

聊聊Flink的必知必会(一)

Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

聊聊Flink的必知必会(三)

### 概述 在进行流处理时,很多时候想要对流的有界子集进行聚合分析。例如有如下的需求场景: (1)每分钟的页面浏览(PV)次数。 (2)每用户每周的会话次数。 (3)每分钟每传感器的最高温度。 (4)当电商发布一个秒杀活动时,想要每隔10min了解流量数据。 对于这些需求的处理,程序需要处理元素组

聊聊Flink CDC必知必会

CDC是(Change Data Capture变更数据获取)的简称。 核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 ## Flink CDC的设

聊聊日志聚类算法及其应用场景

阅读《[基于 Flink ML 搭建的智能运维算法服务及应用](https://mp.weixin.qq.com/s/yhXiQtUSR4hxp9XWrkiiew "基于 Flink ML 搭建的智能运维算法服务及应用")》一文后,对其中日志聚类算法有了些思考。 ### 概述 日志聚类,简而言之是对

聊聊GLM-4-9B开源模型的微调loss计算

概述 Github官方地址:GLM-4 网上已经有很多关于微调的文章,介绍各种方式下的使用,这里不会赘述。我个人比较关心的是微调时的loss计算逻辑,这点在很多的文章都不会有相关的描述,因为大多数人都是关心如何使用之类的应用层,而不是其具体的底层逻辑,当然咱也说不清太底层的计算。 可了解其它loss

聊聊一个差点被放弃的项目以及近期的开源计划

前言 自从 StarBlog 和 SiteDirectory 之后,我还没写新的关于开源项目的系列,最近又积累了很多想法,正好写一篇博客来总结一下。 关于差点被放弃的项目,就是最近一直在做的单点认证(IdentityServerLite) IdentityServerLite 开发这个项目的起因,是

聊聊 JSON Web Token (JWT) 和 jwcrypto 的使用

哈喽大家好,我是咸鱼。 最近写的一个 Python 项目用到了 jwcrypto 这个库,这个库是专门用来处理 JWT 的,JWT 全称是 JSON Web Token ,JSON 格式的 Token。 今天就来简单入门一下 JWT。 官方介绍:https://jwt.io/introduction

聊聊MySQL是如何处理排序的

在MySQL的查询中常常会用到 order by 和 group by 这两个关键字,它们的相同点是都会对字段进行排序,那查询语句中的排序是如何实现的呢?