• Flink RoaringBitmap去重


    1、RoaringBitmap的依赖

    1. <!-- 去重大哥-->
    2. <dependency>
    3. <groupId>org.roaringbitmap</groupId>
    4. <artifactId>RoaringBitmap</artifactId>
    5. <version>0.9.21</version>
    6. </dependency>

    2、Demo去重

    1. package com.gwm.driver;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
    4. import com.aliyun.datahub.client.model.RecordEntry;
    5. import com.gwm.pojo.EventSuccessInfo;
    6. import com.gwm.utils.TimeToStampUtil;
    7. import com.gwm.utils.getString;
    8. import org.apache.flink.api.common.functions.FlatMapFunction;
    9. import org.apache.flink.api.common.functions.MapFunction;
    10. import org.apache.flink.api.common.functions.RichFilterFunction;
    11. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    12. import org.apache.flink.api.common.state.MapState;
    13. import org.apache.flink.api.common.state.MapStateDescriptor;
    14. import org.apache.flink.api.common.state.ValueState;
    15. import org.apache.flink.api.common.state.ValueStateDescriptor;
    16. import org.apache.flink.api.common.time.Time;
    17. import org.apache.flink.api.common.typeinfo.TypeHint;
    18. import org.apache.flink.api.common.typeinfo.TypeInformation;
    19. import org.apache.flink.api.java.utils.ParameterTool;
    20. import org.apache.flink.configuration.Configuration;
    21. import org.apache.flink.streaming.api.TimeCharacteristic;
    22. import org.apache.flink.streaming.api.datastream.DataStream;
    23. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    24. import org.apache.flink.streaming.api.datastream.KeyedStream;
    25. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    26. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    27. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    28. import org.apache.flink.util.Collector;
    29. import org.apache.flink.util.StringUtils;
    30. import org.roaringbitmap.longlong.Roaring64Bitmap;
    31. import scala.Tuple4;
    32. import java.text.ParseException;
    33. import java.text.SimpleDateFormat;
    34. import java.util.Date;
    35. import java.util.List;
    36. import java.util.Properties;
    37. import java.util.UUID;
    38. /**
    39. * @author yangyingchun
    40. * @version 1.0
    41. * @date 2022/11/14 16:26
    42. */
    43. public class EventOrderSuccessRoaringBitmap {
    44. private static String endPoint = "endPoint ";
    45. //private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。
    46. private static String projectName = "projectName ";
    47. private static String topicSourceName = "topicSourceName ";
    48. // private static String topicSourceName = "topicSourceName ";
    49. private static String accessId = "accessId ";
    50. private static String accessKey = "accessKey ";
    51. //设置消费的启动位点对应的时间。TimeToStampUtil.timeToStamp("2021-12-21") 此时间至少为当前时间
    52. // private static Long datahubStartInMs = TimeToStampUtil.timeToStamp("2023-02-23");
    53. private static Long datahubStartInMs = System.currentTimeMillis();
    54. private static Long datahubEndInMs=Long.MAX_VALUE;
    55. private static SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    56. private static SimpleDateFormat sd1 = new SimpleDateFormat("yyyy-MM-dd");
    57. private static Date startDate;
    58. static {
    59. try {
    60. startDate = sd1.parse(sd.format(new Date()));
    61. } catch (ParseException e) {
    62. e.printStackTrace();
    63. }
    64. }
    65. ;
    66. public static void main(String[] args) throws Exception {
    67. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    68. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    69. env.enableCheckpointing(3600000L);
    70. // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    71. // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    72. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    73. env.setParallelism(1);
    74. DataStreamSource<List<RecordEntry>> aedata = env.addSource(
    75. new DatahubSourceFunction(
    76. endPoint,
    77. projectName,
    78. topicSourceName,
    79. accessId,
    80. accessKey,
    81. datahubStartInMs,
    82. datahubEndInMs,
    83. 20L,
    84. 1000L,
    85. 1000
    86. ));
    87. DataStream<Tuple4<String, EventSuccessInfo, String, Long>> aecollectordataDataStream = aedata.flatMap(new FlatMapFunction, Tuple4>() {
    88. @Override
    89. public void flatMap(List<RecordEntry> value, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {
    90. for (RecordEntry recordEntry : value) {
    91. String phone = getString.getString(recordEntry, "customer_phone");
    92. Long order_sn = Long.parseLong(getString.getString(recordEntry, "order_no"));
    93. String brand = getString.getString(recordEntry, "brand");
    94. String car_model = getString.getString(recordEntry, "car_model");
    95. String action_time = "null".equals(getString.getString(recordEntry, "paid_at"))||"".equals(getString.getString(recordEntry, "paid_at"))?null:
    96. sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "paid_at"))/1000));
    97. Double paid_amount = "null".equals(getString.getString(recordEntry, "paid_amount"))?null:
    98. Double.parseDouble(getString.getString(recordEntry, "paid_amount"));
    99. String name = getString.getString(recordEntry, "customer_name");
    100. String operation_flag = getString.getString(recordEntry, "new_dts_sync_dts_after_flag");
    101. String order_time = "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:
    102. sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "order_time"))/1000));
    103. String order_state = getString.getString(recordEntry, "order_state"); //'订购成功'
    104. Date add_time =
    105. "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))
    106. ?null
    107. :new Date(Long.parseLong(getString.getString(recordEntry, "order_time")) / 1000);
    108. // startDate = sd1.parse(sd.format(new Date()));
    109. System.out.println(order_state+"====startDate:"+startDate+"====paid_at:"+order_time+"=====phone+order_sn:"+phone+"--"+order_sn);
    110. //这里有三个问题,
    111. // 1、技术+业务:因为获取的是数据库操作日志,所以数据是重复的,(已经做了重复校验,确保不会重复发且无时效性)
    112. // 2、技术:如果操作了历史数据,且用户的订单状态恰好还是订购成功时,也会触达,是不是要加限制,加的话加什么合适,
    113. // 新增且当天(很多数据是获取不到时间的)?还是所有时间都推,再ma测加一个时间的控制条件
    114. // 结论:空的也要,
    115. // 3、业务:需要明确订购成功的规则,否则极易造成异常, order_state=12当前是订购成功 能复用吗
    116. if (
    117. // "12".equals(order_state)&&
    118. "Y".equals(operation_flag)
    119. // &&!StringUtils.isNullOrWhitespaceOnly(order_time)
    120. // &&add_time.after(startDate)
    121. ){
    122. EventSuccessInfo eventSuccessInfo = new EventSuccessInfo(
    123. phone
    124. , order_sn
    125. , brand
    126. , car_model
    127. , action_time
    128. , paid_amount
    129. , name
    130. , operation_flag
    131. ,order_time
    132. ,order_state
    133. );
    134. // System.out.println(eventSuccessInfo);
    135. Tuple4<String, EventSuccessInfo, String, Long> tuple4
    136. = new Tuple4<String, EventSuccessInfo, String, Long>(
    137. "test_event_order_success"
    138. ,eventSuccessInfo
    139. ,UUID.randomUUID().toString().replace("-","")
    140. ,System.currentTimeMillis()
    141. );
    142. out.collect(tuple4);
    143. }
    144. }
    145. }
    146. });
    147. KeyedStream<Tuple4<String, EventSuccessInfo, String, Long>, String> tuple4StringKeyedStream
    148. = aecollectordataDataStream.keyBy(x -> x._2().getPhone());
    149. // StateTtlConfig ttlConfig = StateTtlConfig
    150. // .newBuilder(Time.days(2))
    151. // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    152. // .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    153. // .build();
    154. //create StateDescriptor
    155. //这里进行状态注册通过bitmap高效存储实现去重,当然bitmap去重只适合bigint场景
    156. ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor(
    157. "Roaring64Bitmap",
    158. TypeInformation.of(new TypeHint<Roaring64Bitmap>() {
    159. }));
    160. //手机号去重逻辑 通过Roaring64Bitmap
    161. SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> map = tuple4StringKeyedStream.filter(new RichFilterFunction>() {
    162. //1.定义状态 进行手机号去重
    163. private transient ValueState<Roaring64Bitmap> bitmapState;
    164. @Override
    165. public void open(Configuration parameters) throws Exception {
    166. // 设置状态生命周期
    167. // StateTtlConfig stateTtlConfig = new StateTtlConfig
    168. // .Builder(Time.days(1)) // 周期为1
    169. // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或者更新状态时重新刷新生命周期
    170. // .build();
    171. bitmapState = getRuntimeContext().getState(bitmapDescriptor);;
    172. }
    173. @Override
    174. public boolean filter(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {
    175. //由于本程序只筛选订购成功的,所以每个手机号+每个订单唯一确认一条数据(订单状态已经在上游过滤过了)
    176. Roaring64Bitmap bitmap = bitmapState.value();
    177. if (bitmap == null) {
    178. bitmap = new Roaring64Bitmap();
    179. }
    180. if (!bitmap.contains(value._2().getOrder_sn())) {
    181. bitmap.addLong(value._2().getOrder_sn());
    182. bitmapState.update(bitmap);
    183. return true;
    184. }
    185. return false;
    186. }
    187. });
    188. //因为是binlog,但需求只要数据时间是当天的 :通过flink定时器 定义每天零晨更新比较时间
    189. SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> process = map.keyBy(x -> x._2().getPhone()).process(new KeyedProcessFunction, Tuple4>() {
    190. //1.定义状态 进行手机号去重
    191. private ValueState<String> timeSate;
    192. @Override
    193. public void processElement(Tuple4<String, EventSuccessInfo, String, Long> value, Context ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {
    194. //获取格林威治标准时间的第二天00:00:00即获取北京时间的第二天08:00:00
    195. // long ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24);
    196. //获取北京时间的第二天00:00:00
    197. long ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)- 8 * 60 * 60 * 1000;
    198. // long ts = 1677054000000L;
    199. //如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次
    200. ctx.timerService().registerProcessingTimeTimer(ts);
    201. out.collect(value);
    202. }
    203. @Override
    204. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {
    205. //定时器质性,每天凌晨更新开始时间
    206. // System.out.println(timestamp);
    207. System.out.println("定时器执行了:" + timestamp);
    208. //状态初始化
    209. timeSate.clear();
    210. startDate = sd1.parse(sd.format(new Date()));
    211. System.out.println(startDate);
    212. // startDate = sd1.parse("2023-02-01");
    213. }
    214. });
    215. SingleOutputStreamOperator<Tuple4<String, String, String, Long>> jsonString = process.map(new MapFunction, Tuple4>() {
    216. @Override
    217. public Tuple4<String, String, String, Long> map(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {
    218. return new Tuple4<String, String, String, Long>(
    219. value._1(),
    220. JSON.toJSONString(value._2()),
    221. value._3(),
    222. value._4()
    223. );
    224. }
    225. });
    226. jsonString.print();
    227. // jsonString.addSink(new EventOmsSuccessSink());
    228. env.execute("EventOrderSuccess===>");
    229. }
    230. }

    3、注意:Roaring64Bitmap 去重只适合去重整形情况

  • 相关阅读:
    理解一致性哈希算法
    [数据结构+算法]关于动态规划dp入门--01背包问题
    RK3568平台开发系列讲解(调试篇)系统运行相关频率设置
    为了简写这行代码,我竟使用静态和动态编译技术
    企业为什么难创新?5个常见的创新障碍
    如何利用python编辑图片,删除其中一部分?
    ​​【项目实战】犬只牵绳智能识别:源码详细解读与部署步骤
    SpringBoot SpringBoot 开发实用篇 4 数据层解决方案 4.5 SpringBoot 整合 Redis
    ETCD快速入门-03 常用命令
    【效率提升】倍速插件Global Speed
  • 原文地址:https://blog.csdn.net/weixin_44996457/article/details/133278187