FLINK June 08, 2021

Flink自定义窗口触发器

Words count 16k Reading time 14 mins. Read count 0

flink: 1.10.1

Github: https://github.com/shirukai/flink-examples-trigger.git

前段时间同事开发遇到一个需求,要求按照事件时间10分钟窗口计数,同时需要有新数据时每分钟输出一次结果。窗口计数非常简单,设置一个窗口,然后运用一个聚合函数就可以解决了。但是有新数据时每分钟输出一次结果,好像实现起来并不容易,因为按照平时的窗口,设置10分钟之后,要等到watermark超过了当前的窗口之后,才会被触发计算。这里还涉及到一个问题,如果10分钟的窗口之后,再没有数据产生,那么watermark就不会增长,窗口也就不会触发计算,那我们就一直拿不到结果。如何解决上面的问题呢,这里就需要涉及到触发器的概念。本篇文章先介绍Flink内置的几种触发器,并且使用自定义窗口触发器解决上述问题。

1 创建示例项目

使用maven基于Flink官方模板创建一个项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DgroupId=flink.examples -DartifactId=flink-examples-trigger -Dversion=1.0 -Dpackage=flink.trigger.example -DinteractiveMode=false

2 Flink内置的触发器

在介绍自定义窗口触发器之前,先简单介绍一下Flink内置的几种触发器,要知道我们平时使用的时间窗口、计数窗口等,窗口在什么时候触发计算,什么时候清理窗口状态,这些逻辑都是在触发器中实现的。Flink内置触发器如下:

触发器 描述
CountTrigger 当窗口中的元素计数达到给定数量时触发的触发器
EventTimeTrigger 水印超高窗口结束时间时触发的触发器
ProcessingTimeTrigger 系统时间超过窗口结束时间时触发的触发器
ContinuousEventTimeTrigger 给定一个时间间隔,按照事件时间连续触发的触发器
ContinuousProcessTrigger 给定一个时间间隔,按照处理时间连续触发的触发器
PurgingTrigger 可以包装任何一个触发器,使其触发之后,清除窗口和状态
DeltaTrigger 指定一个DeltaFunction,和一个阈值,当计算出来的Delta值超出给定阈值时触发的触发器

1.1 CountTrigger

给定一个最大数量,当窗口中的元素大于等于这个数量时,触发窗口计算。

实现上也相对简单,使用一个ReduceState来计数,并且指定一个累加方法的实现。每来一条数据count+1,然后判断是否超过指定的数量,如果超过就触发计算,清空计数器。

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

1.2 EventTimeTrigger

基于事件时间的触发器,当窗口的最大时间小于watermark时触发计算。

具体实现如下代码,可以看到当数据来的时候,会判断当前窗口的最大事件是否比当前的水印小,如果小的话触发计算,否则注册一个基于事件时间的定时器,返回一个CONTINUE。

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

1.3 ProcessingTimeTrigger

基于处理时间的触发器,当系统时间超出窗口的最大时间时触发。

具体实现如下代码,数据到来时,仅仅是注册一个基于处理时间的定时器,当定时器触发时同事也触发窗口计算。

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

1.4 ContinuousEventTimeTrigger

从字面意思上理解是连续的事件时间触发器,需要指定一个时间间隔,基于事件时间的定时器会根据时间间隔定时触发窗口计算。

具体代码如下所示,当窗口数据到达时,前面的逻辑与EventTimeTrigger是一样的,先判断当前的水印是否超过了当前窗口的最大值,如果超过了,那就触发窗口计算,否则注册一个基于事件时间到达窗口最大时间的定时器。然后从状态里拿到按照间隔触发的定时器时间戳,如果这个时间戳不存在,就重新计算一出下一次定时触发的时间戳是多少,注册基于事件时间的定时器,并将该时间戳重新保存到状态中。当事件时间定时器执行时,判断是否触发,并注册新的定时器。

    private ContinuousEventTimeTrigger(long interval) {
        this.interval = interval;
    }
    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerEventTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

        if (time == window.maxTimestamp()){
            return TriggerResult.FIRE;
        }

        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

        Long fireTimestamp = fireTimestampState.get();

        if (fireTimestamp != null && fireTimestamp == time) {
            fireTimestampState.clear();
            fireTimestampState.add(time + interval);
            ctx.registerEventTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

1.5 ContinuousProcessTrigger

从字面意思上理解是连续的处理时间触发器,需要指定一个时间间隔,基于处理时间的定时器会根据时间间隔定时触发窗口计算。

具体代码如下所示,当窗口数据到达时,从状态中拿到基于处理时间定时器的时间戳,如果时间戳不存在,重新计算出新的定时器时间戳,并基于这个时间戳注册一个基于处理时间的定时器。当处理时间的定时器执行时,触发窗口计算,并基于时间间隔注册新的定时器。


    private ContinuousProcessingTimeTrigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerProcessingTimeTimer(nextFireTimestamp);

            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }
    
        @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

1.6 DeltaTrigger

需要指定一个DeltaFunction用来计算差值,还需要给定一个阈值,用来比较差值是否大于指定的阈值。只有在差值大于阈值的时候才会触发计算。

代码实现如下所示,构造方法中需要用户传入一个DeltaFunction,还要指定一个阈值threshold。当窗口数据到达时,先判断上一个值在状态中是否存在,如果存在,就调用用户的DeltaFunction,传入上一个值和当前值来计算出差值来,如果差值比阈值大,就触发计算。


    private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer);

    }

    @Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
        if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
        }
        if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

1.7 PurgingTrigger

可以包装任何一个触发器,使其触发之后,清除窗口和状态。


    private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
        this.nestedTrigger = nestedTrigger;
    }

    @Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

2 自定义触发器

通过对比上述的触发器,其实可以发现ContinuousProcessTrigger能够满足我们每分钟输出结果的需求,但是这个触发器有个问题,就是不管有没有新数据生成,它都会再一分钟输出一次结果。如果我们想要只有在有数据的时候触发,可以在这个触发器的基础上做一些改动。

package flink.trigger.example.custom;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.util.Objects;

/**
 * 事件时间间隔触发器
 * copy from {@link org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger}
 * 重写 {@link org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger#onEventTime(long, TimeWindow, TriggerContext)}
 * 和 {@link org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger#onProcessingTime(long, TimeWindow, TriggerContext)}
 * 方法:
 * 1. 在onElement方法里增加基于处理时间的定时器
 * 2. 在onProcessingTime方法里增加定时器触发后将窗口发出的逻辑
 *
 * @author shirukai
 */
public class EventTimeIntervalTrigger<T> extends Trigger<T, TimeWindow> {
    private final long interval;

    /**
     * When merging we take the lowest of all fire timestamps as the new fire timestamp.
     */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new EventTimeIntervalTrigger.Min(), LongSerializer.INSTANCE);

    private EventTimeIntervalTrigger(long interval) {
        this.interval = interval;
    }

    /**
     * 创建trigger实例
     *
     * @param interval 间隔
     * @param <T>      数据类型泛型
     * @return EventTimeIntervalTrigger
     */
    public static <T> EventTimeIntervalTrigger<T> of(Time interval) {
        return new EventTimeIntervalTrigger<>(interval.toMilliseconds());
    }

    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            // 判断定时器是否注册过
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
            if (Objects.isNull(fireTimestamp.get()) || fireTimestamp.get() < 0) {
                // 注册一个基于处理时间的计时器
                long currentTimestamp = ctx.getCurrentProcessingTime();
                long nextFireTimestamp = currentTimestamp - (currentTimestamp % interval) + interval;
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
                fireTimestamp.add(nextFireTimestamp);
            }
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).add(Long.MIN_VALUE);
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        // 清理事件时间定时器
        ctx.deleteEventTimeTimer(window.maxTimestamp());
        // 清理处理时间定时器
        if (Objects.isNull(fireTimestamp.get()) || fireTimestamp.get() > 0) {
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
        }
        // 清理状态
        fireTimestamp.clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }

    @Override
    public String toString() {
        return "EventTimeIntervalTrigger()";
    }
}
0%