• Flink中常用的去重方案


    Flink Sql去重方案

    1、状态去重

    将数据保存到状态中,进行累计

    1. select
    2. window_start,
    3. window_end,
    4. count(distinct devId) as cnt
    5. from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
    6. group by window_start,window_end;

    2、利用HyperLogLog进行去重

    1. select
    2. window_start,
    3. window_end,
    4. hllDistinct(distinct devId) as cnt
    5. from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
    6. group by window_start,window_end;

    3、Deduplication方式

    rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。

    Deduplicate Keep FirstRow

    保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。

    1. SELECT *
    2. FROM (
    3. SELECT *,
    4. ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
    5. FROM T
    6. )
    7. WHERE rowNum = 1

    Deduplicate Keep LastRow

    保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。

    1. SELECT *
    2. FROM (
    3. SELECT *,
    4. ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
    5. FROM T
    6. )
    7. WHERE rowNum = 1

    Flink 程序去重方案

    1. package com.yyds.flink_distinct;
    2. import org.apache.flink.api.common.state.*;
    3. import org.apache.flink.configuration.Configuration;
    4. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    5. import org.apache.flink.util.Collector;
    6. /**
    7. * 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
    8. */
    9. public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {
    10. // 定义第一个状态MapState
    11. MapState deviceIdState ;
    12. // 定义第二个状态ValueState
    13. ValueState countState ;
    14. @Override
    15. public void open(Configuration parameters) throws Exception {
    16. MapStateDescriptor deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
    17. /*
    18. MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
    19. 如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
    20. mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
    21. */
    22. deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
    23. ValueStateDescriptor countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
    24. /*
    25. ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
    26. */
    27. countState = getRuntimeContext().getState(countStateDescriptor);
    28. }
    29. @Override
    30. public void processElement(_01_AdvertiseMentData data, Context context, Collector collector) throws Exception {
    31. // 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
    32. long currw = context.timerService().currentWatermark();
    33. if(context.getCurrentKey().getTime() + 1 <= currw){
    34. System.out.println("迟到的数据:" + data);
    35. return;
    36. }
    37. String devId = data.getDevId();
    38. Integer i = deviceIdState.get(devId);
    39. if(i == null){
    40. i = 0;
    41. }
    42. if( i == 1 ){
    43. // 表示已经存在
    44. }else {
    45. // 表示不存在,放入到状态中
    46. deviceIdState.put(devId,1);
    47. // 将统计的数据 + 1
    48. Long count = countState.value();
    49. if(count == null){
    50. count = 0L;
    51. }
    52. count ++;
    53. countState.update(count);
    54. // 注册一个定时器,定期清理状态中的数据
    55. context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
    56. }
    57. System.out.println("countState.value() = " + countState.value());
    58. }
    59. @Override
    60. public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
    61. System.out.println(timestamp + " exec clean~~~");
    62. System.out.println("countState.value() = " + countState.value());
    63. // 清除状态
    64. deviceIdState.clear();
    65. countState.clear();
    66. }
    67. }

  • 相关阅读:
    智能电销机器人,主要体现的价值是什么
    深度学习笔记-------KNN算法
    【笔试题】【day21】
    Spring循环依赖
    中国聚合支付行业市场全景调研及投资价值评估研究报告
    Vue3 + TypeScript
    锂电池储能系统建模发展现状及其数据驱动建模初步探讨
    微服务架构 | 架构演进
    js中的Formdata数据结构
    LLM - Model、Data、Training、Generate Agruments 超参解析
  • 原文地址:https://blog.csdn.net/qq_42456324/article/details/127821866