Pure Soul

  1. 首页
  2. 系统设计
  3. flink
  4. 正文

Timer on Flink

2024年5月29日 1460点热度 0人点赞 0条评论

目录

  • Timer on ProcessFunction and KeyedProcessFunction
    • Register Timer on ProcessFunction
    • How source code of Timer worked
    • Using Timer on KeyedProcessFunction

Timer on ProcessFunction and KeyedProcessFunction

Flink distinguishes between two key notions of time: processing time and event time. Processing time refers to the system time at which an event is processed, whereas event time is explicitly defined or configured based on the timestamps of the events themselves. For instance, consider a scenario where an action needs to be triggered at a specific timestamp, such as sending a notification five minutes after a particular event occurs. In such cases, Flink's timer functionality is crucial.

Register Timer on ProcessFunction

In Flink, the Timer or onTimer() method can only be utilized with keyed streams, such as within KeyedProcessFunction and ProcessFunction following a keyBy() operation. It is important to note that when registering a timer in KeyedProcessFunction, it is associated with a specific key and timestamp, and similarly, it is bound with the current thread in ProcessFunction. Consequently, a timer cannot be registered more than once with the same key and timestamp in KeyedProcessFunction. Likewise, the same timer cannot be registered with the same trigger timestamp. Crucially, once a timer is registered, it will only trigger once.

Below is a simple demonstration of how to register and trigger timers in ProcessFunction and KeyedProcessFunction:

env.addSource(new StockSqlReadingV3()).map(new MapFunction<StockSql, StockSql>() {
                    @Override
                    public StockSql map(StockSql stock) throws Exception {
                        stock.setPrice(stock.getPrice() * 7);
                        return stock;
                    }
                }).process(new TimerServiceProcessFunction()).sinkTo(new PrintSink<>());

After runing, console log appears the following message.
Caused by: java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams.

Now, let's use keyBy() before process() and write some easy code on onTimer() method. Note that, we register timer twice with same trigger time.

import entity.StockSql;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
public class TimerServiceProcessFunction extends ProcessFunction<StockSql, StockSql> {
    private transient SimpleDateFormat sdf;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }

    @Override
    public void processElement(StockSql stockSql, ProcessFunction<StockSql, StockSql>.Context context, Collector<StockSql> collector) throws Exception {
        val timerService = context.timerService();
        // register twice
        Date date = this.sdf.parse(stockSql.getTtime());
        timerService.registerProcessingTimeTimer(date.getTime() + 3000);
        timerService.registerProcessingTimeTimer(date.getTime() + 3000);
        log.info("Thread: {}, register timer twice at processElement at {}", Thread.currentThread().getName(),
                date.getTime() + 3000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<StockSql> out) throws Exception {
        log.info("Thread: {}, timer being triggered at timestamp: {}", Thread.currentThread().getName(), timestamp);
    }
}

After runing our task, we can observe console outputs, such as

[Process -> Sink: Writer (7/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (7/8)#0, register timer twice at processElement at 1716906987000
[Process -> Sink: Writer (4/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (4/8)#0, register timer twice at processElement at 1716906987000
[Process -> Sink: Writer (7/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (7/8)#0, timer being triggered at timestamp: 1716906987000
[Process -> Sink: Writer (4/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (4/8)#0, timer being triggered at timestamp: 1716906987000

How source code of Timer worked

Obviously, timer only can be triggered at most once even we register same timestamp multi times. Actually, when we
debug on source code registerProcessingTimeTimer(), we can find out a timer is bound with grouped key, timestamp
and VoidNamespace (the space is same for all timer). The crucial register method is processingTimeTimersQueue.add(new TimerHeapInternalTimer(time, this.keyContext.getCurrentKey(), namespace)).

Using Timer on KeyedProcessFunction

In KeyedProcessFunction, it's possible to access the processing key and use keyed state, such as MapState<UK, VK>. This functionality allows for state to be maintained on a per-key basis, enhancing the granularity of state management. Conversely, such keyed state cannot be utilized in ProcessFunction, where state management is more generalized and not tied to specific keys. Here’s an example code demonstrating the use of keyed state in KeyedProcessFunction:

public class TimerServiceKeyedProcessFunction extends KeyedProcessFunction<String, StockSql, StockSql> {
    private transient SimpleDateFormat sdf;
    private transient MapState<String, Integer> mapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        this.mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("map state",
                String.class,
                Integer.class));
        log.info("map state: {}", mapState);

    }

    @Override
    public void processElement(StockSql stockSql, KeyedProcessFunction<String, StockSql, StockSql>.Context context,
                               Collector<StockSql> collector) throws Exception {
        val timerService = context.timerService();
        // register twice
        Date date = this.sdf.parse(stockSql.getTtime());
        timerService.registerProcessingTimeTimer(date.getTime() + 3000);
        Integer cnt = this.mapState.get(context.getCurrentKey());
        if (Objects.isNull(cnt)) {
            cnt = 0;
        }
        log.info("Thread: {}, key = {}, register timer {} times at processElement at {}",
                Thread.currentThread().getName(),
                context.getCurrentKey(), cnt + 1,
                date.getTime() + 3000);
        this.mapState.put(context.getCurrentKey(), cnt + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<StockSql> out) throws Exception {
        log.info("Thread: {}, key = {}, timer being triggered at timestamp: {}", Thread.currentThread().getName(),
                ctx.getCurrentKey(),
                timestamp);
    }
}
标签: 暂无
最后更新:2024年7月7日

ycq

这个人很懒,什么都没留下

点赞
< 上一篇

COPYRIGHT © 2021 oo2ee.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS