将数据保存到状态中,进行累计
- select
- window_start,
- window_end,
- count(distinct devId) as cnt
- from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
- group by window_start,window_end;
- select
- window_start,
- window_end,
- hllDistinct(distinct devId) as cnt
- from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
- group by window_start,window_end;
当rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。
Deduplicate Keep FirstRow
保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。
- SELECT *
- FROM (
- SELECT *,
- ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
- FROM T
- )
- WHERE rowNum = 1
Deduplicate Keep LastRow
保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。
- SELECT *
- FROM (
- SELECT *,
- ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
- FROM T
- )
- WHERE rowNum = 1
- package com.yyds.flink_distinct;
-
- import org.apache.flink.api.common.state.*;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.util.Collector;
-
- /**
- * 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
- */
- public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {
-
-
- // 定义第一个状态MapState
- MapState
deviceIdState ; - // 定义第二个状态ValueState
- ValueState
countState ; -
- @Override
- public void open(Configuration parameters) throws Exception {
- MapStateDescriptor
deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class); - /*
- MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
- 如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
- mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
- */
- deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
-
- ValueStateDescriptor
countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class); - /*
- ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
- */
- countState = getRuntimeContext().getState(countStateDescriptor);
-
- }
-
- @Override
- public void processElement(_01_AdvertiseMentData data, Context context, Collector
collector) throws Exception { - // 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
- long currw = context.timerService().currentWatermark();
- if(context.getCurrentKey().getTime() + 1 <= currw){
- System.out.println("迟到的数据:" + data);
- return;
- }
-
- String devId = data.getDevId();
- Integer i = deviceIdState.get(devId);
- if(i == null){
- i = 0;
- }
-
- if( i == 1 ){
- // 表示已经存在
- }else {
- // 表示不存在,放入到状态中
- deviceIdState.put(devId,1);
- // 将统计的数据 + 1
- Long count = countState.value();
-
- if(count == null){
- count = 0L;
- }
- count ++;
- countState.update(count);
- // 注册一个定时器,定期清理状态中的数据
- context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
- }
-
- System.out.println("countState.value() = " + countState.value());
- }
-
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector
out) throws Exception { - System.out.println(timestamp + " exec clean~~~");
- System.out.println("countState.value() = " + countState.value());
- // 清除状态
- deviceIdState.clear();
- countState.clear();
- }
- }