Timer(定时器)是Flink Streaming API提供的用于感知并利用processing time(或event time)变化的机制。我们最常见的接触它的地方就是KeyedProcessFunction, 在这里我们通过在processElement()方法中注册Timer,然后覆盖onTimer()方法并在其中添加Timer触发时的回调逻辑。根据时间特征的不同,可分为两种情况:
-
Processing Time:通过调用context.timerService().registerProcessingTimeTimer()方法注册,onTimer()方法在系统时间戳达到Timer设定的时间戳 时进行触发;
-
Event Time:通过调用context.timerService().rigisterEventTimeTimer()注册,onTimer()方法在Flink内部水印达到或超过Timer设定的时间戳 时触发;
除了在KeyedProcessFunction中使用外,Timer在窗口机制中也有着重要的地位。提及窗口,最容易想到的便是Trigger触发器,相关内容在Flink源码阅读8:窗口触发器 中进行过分析,此处不再赘述。
负责执行KeyedProcessFunction的算子是KeyedProcessOperator,在其中以内部类的形式实现了KeyedProcessFunction所需要的上下文类Context,由此 可见timerService()方法返回的是外部传入的TimerService的实例,而Context这个类是在KeyedProcessOperator类的open()方法中被实例化的,所以 timerService的实例也是在此处传入的SimpleTimerService实现,而查看SimpleTimerService的源码会发现它只是对InternalTimerService的简单代理。 InternalTimerService的实例通过getInternalTimerService()方法获取,这是个定义在AbstractStreamOperator中的方法。
KeyedProcessOperator.processElement()方法调用用户自定义函数的processElement()方法,并传递了定时器的触发时间戳、TimestampedCollector 的实例以及上下文实例ContextImpl,所以用户可以通过这个上下文实例获得TimerService并注册Timer。Timer被触发时,实际上是根据时间特征调用事件时 间或处理时间的处理函数也就是onEventTime()/onProcessingTime()方法(),并触发用户函数里面的onTimer()函数回调逻辑。
再回过头来看下getInternalTimerService()方法,这个方法返回了一个InternalTimerService,每个算子可以有一个或多个InternalTimerService,每 一个InternalTimerService都有自己的命名空间序列化器,定时器服务由请求它时传入的字符类型的key来区分,如果用相同的key请求这个方法,将会在随后的 请求中得到相同的定时器服务。最终,InternalTimerService由keyedTimeServiceHandler.getInternalTimerService()方法取得。
keyedTimeServiceHandler的实现是InternalTimeServiceManager,所以再来看下其中的getInternalTimerService()方法。在其中可以看到InternalTimerService 的实现实际上是InternalTimerServiceImpl类。InternalTimeServiceManager中会使用HashMap来维护特定名称下的所有InternalTimerServiceImpl实例, 如果名称已存在,则会直接返回而不会重新创建。初始化InternalTimerServiceImpl时,会同时创建两个包含TimerHeapInternalTimer的优先队列,分别用于 维护事件时间和处理时间的Timer。
InternalTimer的实现是TimerHeapInternalTimer,从它可以看出,Timer的Scope有两个,分别是数据的key和命名空间(这与许多文档描述的一致)。但是有过 使用经验的我们都知道,在实际的使用过程中其实是感知不到命名空间的存在的。InternalTimer类中的timerHeapIndex是Timer在优先队列里存储的下标。优先 队列通常用二叉堆实现,而二叉堆一个很常见的实现方式就是数组,所以存储下标后能根据下标实现较快的删除操作。comparePriorityTo()方法用于比较优先级,Timer 的优先级很显然是按时间戳排序的小顶堆。
InternalTimeServiceManager.createTimerPriorityQueue()方法通过调用priorityQueueSetFactory.create()方法创建优先级队列的集合HeapPriorityQueueSet。 它的实现与Java自带的PriorityQueue的实现差不太多,只是加入了快速删除和去重逻辑。这个里面涉及到了KeyGroup和KeyGroupRange的概念,实际上KeyGroup是 Flink中非常重要的一个概念,在进行checkpoint时就有遇到过,它是Flink内部KeyedState的管理的原子单位,也是一些key的集合,一个任务的KeyGroup的数量与 其最大并行度一致。deduplicationMapsByKeyGroup主要用于在KeyGroup级别对key进行去重,数组中的每个元素就是一个HashMap,也对应一个KeyGroup。HashMap 通过putIfAbsent()方法添加时,只有当key不存在时才能添加进去,这也是去重得以实现的原因。
为什么要引入KeyGroup的概念呢?我们设想一下,如果现在改变了某一个算子的并行度,如果这个算子没有状态,实现起来其实很简单。但是如果这个算子有状态呢? 如果Flink中的key是按照hash(key)%parallelism的规则分配到各个子Task上去的,那么我们就必须在改变算子并行度的同时,根据新分配的key集合从分布式存 储中取回对应的Keyed State数据,由于parallelism的取值变化对规则的影响特别大(类比在负载均衡策略中,根据机器个数进行请求的负载均衡,这也是一致性 哈希出现的原因),所以这会是个非常耗时和低效的操作。为了解决这个问题,Flink提出了Key Group的概念。
Key Group是Keyed State分配的原子单位,而其数量与定义的最大并行度相同,所以Key Group的索引范围位于0到最大并行度减去1的区间内。每个子Task都会处理 一个或多个Key Group,也就是KeyGroupRange。而KeyGroupRange顾名思义就是一些连续的Key Group的范围,它也可以看作是当前子任务在本地维护的所有的key。 那么如何确定一个key应该分配到哪个Key Group中呢?从KeyGroupRangeAssignment.assignToKeyGroup()方法可以看到其是先对key取哈希值,再做murmurHash 后,再对最大并行度取余得到了Key Group的索引。但是这样得到的索引的范围再前面提到过是0到最大并行度减去1,是无法直接分配给子Task的,还必须有算法决定一个 子Task该处理哪些KeyGroup,这个在KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex()方法中,其是由并行度、最大并行度和算子实例的 ID共同决定的,具体的分配逻辑是:
-
startKeyGroup:((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
-
endKeyGroup:((operatorIndex + 1) * maxParallelism - 1) / parallelism;
根据上面的分配逻辑可见,在将Key Group作为Keyed State的基本分配单元之后,修改并行度时的本地性差和随机读的问题都得到了部分的解决。不过,也能很明显的看 出,最大并行度对Key Group分配的影响非常明显,因此轻易不要修改最大并行度的值,在computeDefaultMaxParallelism()方法中给出了计算默认最大并行度的逻辑。
InternalTimerServiceImpl是Timer方法的最底层实现,从这个里面方法的实现来看,注册Timer实际上就是通过时间戳、key和命名空间构造TimerHeapInternalTimer 实例,并将这些实例加入到对应的优先级队列。值得注意的是,当注册基于处理时间的定时器时,要先检查注册的定时器时间戳与当前在最小堆堆顶的定时器时间戳的大小关系。 如果前者比后者要小,就会用前者替代后者,因为处理时间永远是线性增长的。
那么当定时器注册好之后该如何触发呢?在注册处理时间定时器时,通过this::onProcessingTime参数指定了一个回调函数,当处理时间到了所注册的时间戳时就会触发回 调,并按顺序从队列中获取注册时间戳小于等于当前处理时间的所有Timer,并执行triggerTarget.onProcessingTime()方法,也就是KeyedProcessOperator.onProcessingTime() 方法,于是就执行了用户自定义的方法。
最后再来看processingTimeService.registerTimer()方法在SystemProcessingTimeService的实现,它使用调度线程池实现回调,onProcessingTime()在ScheduledTask 线程中被回调,而ScheduledTask线程按照Timer的时间戳来调度。至此,ProcessingTime的大致处理逻辑已经分析得差不多了。
再来看看事件时间的触发情况吧,事件时间与处理系统内部的时间戳无关,而与水印有关。在水印到达时,算子的基类AbstractStreamOperator.processWatermark()方法就会被调 用,并调用InternalTimeServiceManager.processWatermark()方法,最终在InternalTimerServiceImpl.advanceWatermark()方法中触发了triggerTarget.onEventTime() 的调用,并实现了基于事件事件的处理。