关于Evictor的源码只需要关注三个方法就可以了evictBefore,evictAfter,evict.
源码内容如下:
public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
// ...
// doEvictAfter默认为false
// 窗口计算前擦除数据
@Override
public void evictBefore(
Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (!doEvictAfter) {
// 执行数据擦除方法
evict(elements, size, ctx);
}
}
// 窗口计算后擦除数据
@Override
public void evictAfter(
Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (doEvictAfter) {
// 执行数据擦除方法
evict(elements, size, ctx);
}
}
// 擦除数据逻辑
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
long currentTime = getMaxTimestamp(elements);
long evictCutoff = currentTime - windowSize;
// 遍历迭代器中的数据
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 判断本条数据的事件时间是否 <= 移除截止点
if (record.getTimestamp() <= evictCutoff) {
iterator.remove(); // 移除数据
}
}
}
// ...
}
注释中对于evictBefore,evictAfter,evict这三个方法都进行了解释,这里就不细说了.
自定义Evictor
public class CustomEvictor extends TimeEvictor<TimeWindow> {
private long size;
private boolean isAfter;
public CustomEvictor(long windowSize) {
super(windowSize);
this.size = windowSize;
}
public CustomEvictor(long windowSize, boolean doEvictAfter) {
super(windowSize, doEvictAfter);
this.size = windowSize;
this.isAfter = doEvictAfter;
}
@Override
public void evictBefore(
Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
if (!isAfter) {
evict(elements, size, ctx);
}
}
@Override
public void evictAfter(
Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
if (isAfter) {
evict(elements, size, ctx);
}
}
/**
* @Param elements
* @Param size
* @Param ctx
* @return void
* @Description TODO 真正的处理逻辑
**/
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
// 获取当前最大事件时间
long currentTime = getMaxTimestamp(elements);
// 移除截止点
long evictCutoff = currentTime - this.size;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 将数据强转,便于获取ID进行判断
UserEvent2 userEvent2 = (UserEvent2) record.getValue();
// 判断本条数据的事件时间是否 <= 移除截止点
// 这里增加一个判断逻辑,当数据中的ID为1001时,将数据移除
if (record.getTimestamp() <= evictCutoff || userEvent2.getUId().equals("1001")) {
iterator.remove();
}
}
}
/**
* @Param elements
* @return boolean
* @Description TODO 这个也是TimeEvictor中的方法,直接复制过来即可
**/
private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
Iterator<TimestampedValue<Object>> it = elements.iterator();
if (it.hasNext()) {
return it.next().hasTimestamp();
}
return false;
}
/**
* @Param elements
* @return long
* @Description TODO TODO 这个也是TimeEvictor中的方法,直接复制过来即可
**/
private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
long currentTime = Long.MIN_VALUE;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
currentTime = Math.max(currentTime, record.getTimestamp());
}
return currentTime;
}
}
业务代码
public class FlinkWindowDataDelay {
public static void main(String[] args) throws Exception {
// ...
SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口,窗口大小10s, 滑动步长5s
.evictor(new CustomEvictor(10 * 1000, false)) // 添加自定义Evictor
.max("time");// 获取用户行为发生事件最大的这条数据
// ...
env.execute();
}
}
具体数据擦除的逻辑根据实际业务规则而定.