1、RoaringBitmap的依赖
- <!-- 去重大哥-->
- <dependency>
- <groupId>org.roaringbitmap</groupId>
- <artifactId>RoaringBitmap</artifactId>
- <version>0.9.21</version>
- </dependency>
2、Demo去重
- package com.gwm.driver;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
- import com.aliyun.datahub.client.model.RecordEntry;
- import com.gwm.pojo.EventSuccessInfo;
- import com.gwm.utils.TimeToStampUtil;
- import com.gwm.utils.getString;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RichFilterFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.state.MapState;
- import org.apache.flink.api.common.state.MapStateDescriptor;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.api.common.typeinfo.TypeHint;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.StringUtils;
- import org.roaringbitmap.longlong.Roaring64Bitmap;
- import scala.Tuple4;
-
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.List;
- import java.util.Properties;
- import java.util.UUID;
-
- /**
- * @author yangyingchun
- * @version 1.0
- * @date 2022/11/14 16:26
- */
- public class EventOrderSuccessRoaringBitmap {
- private static String endPoint = "endPoint ";
- //private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。
- private static String projectName = "projectName ";
- private static String topicSourceName = "topicSourceName ";
- // private static String topicSourceName = "topicSourceName ";
- private static String accessId = "accessId ";
- private static String accessKey = "accessKey ";
- //设置消费的启动位点对应的时间。TimeToStampUtil.timeToStamp("2021-12-21") 此时间至少为当前时间
- // private static Long datahubStartInMs = TimeToStampUtil.timeToStamp("2023-02-23");
- private static Long datahubStartInMs = System.currentTimeMillis();
- private static Long datahubEndInMs=Long.MAX_VALUE;
- private static SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- private static SimpleDateFormat sd1 = new SimpleDateFormat("yyyy-MM-dd");
- private static Date startDate;
-
- static {
- try {
- startDate = sd1.parse(sd.format(new Date()));
- } catch (ParseException e) {
- e.printStackTrace();
- }
- }
-
- ;
-
- public static void main(String[] args) throws Exception {
-
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(3600000L);
- // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
- env.setParallelism(1);
- DataStreamSource<List<RecordEntry>> aedata = env.addSource(
- new DatahubSourceFunction(
- endPoint,
- projectName,
- topicSourceName,
- accessId,
- accessKey,
- datahubStartInMs,
- datahubEndInMs,
- 20L,
- 1000L,
- 1000
- ));
-
-
-
- DataStream<Tuple4<String, EventSuccessInfo, String, Long>> aecollectordataDataStream = aedata.flatMap(new FlatMapFunction
, Tuple4>() {
- @Override
- public void flatMap(List<RecordEntry> value, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {
- for (RecordEntry recordEntry : value) {
-
- String phone = getString.getString(recordEntry, "customer_phone");
- Long order_sn = Long.parseLong(getString.getString(recordEntry, "order_no"));
- String brand = getString.getString(recordEntry, "brand");
- String car_model = getString.getString(recordEntry, "car_model");
- String action_time = "null".equals(getString.getString(recordEntry, "paid_at"))||"".equals(getString.getString(recordEntry, "paid_at"))?null:
- sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "paid_at"))/1000));
- Double paid_amount = "null".equals(getString.getString(recordEntry, "paid_amount"))?null:
- Double.parseDouble(getString.getString(recordEntry, "paid_amount"));
- String name = getString.getString(recordEntry, "customer_name");
- String operation_flag = getString.getString(recordEntry, "new_dts_sync_dts_after_flag");
- String order_time = "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:
- sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "order_time"))/1000));
- String order_state = getString.getString(recordEntry, "order_state"); //'订购成功'
-
- Date add_time =
- "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))
- ?null
- :new Date(Long.parseLong(getString.getString(recordEntry, "order_time")) / 1000);
- // startDate = sd1.parse(sd.format(new Date()));
-
- System.out.println(order_state+"====startDate:"+startDate+"====paid_at:"+order_time+"=====phone+order_sn:"+phone+"--"+order_sn);
- //这里有三个问题,
- // 1、技术+业务:因为获取的是数据库操作日志,所以数据是重复的,(已经做了重复校验,确保不会重复发且无时效性)
- // 2、技术:如果操作了历史数据,且用户的订单状态恰好还是订购成功时,也会触达,是不是要加限制,加的话加什么合适,
- // 新增且当天(很多数据是获取不到时间的)?还是所有时间都推,再ma测加一个时间的控制条件
- // 结论:空的也要,
- // 3、业务:需要明确订购成功的规则,否则极易造成异常, order_state=12当前是订购成功 能复用吗
- if (
- // "12".equals(order_state)&&
- "Y".equals(operation_flag)
- // &&!StringUtils.isNullOrWhitespaceOnly(order_time)
- // &&add_time.after(startDate)
-
- ){
- EventSuccessInfo eventSuccessInfo = new EventSuccessInfo(
- phone
- , order_sn
- , brand
- , car_model
- , action_time
- , paid_amount
- , name
- , operation_flag
- ,order_time
- ,order_state
- );
- // System.out.println(eventSuccessInfo);
- Tuple4<String, EventSuccessInfo, String, Long> tuple4
- = new Tuple4<String, EventSuccessInfo, String, Long>(
- "test_event_order_success"
- ,eventSuccessInfo
- ,UUID.randomUUID().toString().replace("-","")
- ,System.currentTimeMillis()
- );
- out.collect(tuple4);
- }
- }
- }
- });
-
- KeyedStream<Tuple4<String, EventSuccessInfo, String, Long>, String> tuple4StringKeyedStream
- = aecollectordataDataStream.keyBy(x -> x._2().getPhone());
-
-
- // StateTtlConfig ttlConfig = StateTtlConfig
- // .newBuilder(Time.days(2))
- // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- // .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- // .build();
-
- //create StateDescriptor
-
- //这里进行状态注册通过bitmap高效存储实现去重,当然bitmap去重只适合bigint场景
- ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor(
- "Roaring64Bitmap",
- TypeInformation.of(new TypeHint<Roaring64Bitmap>() {
- }));
-
-
- //手机号去重逻辑 通过Roaring64Bitmap
- SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> map = tuple4StringKeyedStream.filter(new RichFilterFunction
>() { - //1.定义状态 进行手机号去重
- private transient ValueState<Roaring64Bitmap> bitmapState;
- @Override
- public void open(Configuration parameters) throws Exception {
-
- // 设置状态生命周期
- // StateTtlConfig stateTtlConfig = new StateTtlConfig
- // .Builder(Time.days(1)) // 周期为1天
- // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或者更新状态时重新刷新生命周期
- // .build();
-
- bitmapState = getRuntimeContext().getState(bitmapDescriptor);;
- }
- @Override
- public boolean filter(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {
- //由于本程序只筛选订购成功的,所以每个手机号+每个订单唯一确认一条数据(订单状态已经在上游过滤过了)
- Roaring64Bitmap bitmap = bitmapState.value();
- if (bitmap == null) {
- bitmap = new Roaring64Bitmap();
- }
- if (!bitmap.contains(value._2().getOrder_sn())) {
- bitmap.addLong(value._2().getOrder_sn());
- bitmapState.update(bitmap);
- return true;
- }
- return false;
- }
-
- });
-
- //因为是binlog,但需求只要数据时间是当天的 :通过flink定时器 定义每天零晨更新比较时间
- SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> process = map.keyBy(x -> x._2().getPhone()).process(new KeyedProcessFunction
, Tuple4>() { - //1.定义状态 进行手机号去重
- private ValueState<String> timeSate;
- @Override
- public void processElement(Tuple4<String, EventSuccessInfo, String, Long> value, Context ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {
- //获取格林威治标准时间的第二天00:00:00即获取北京时间的第二天08:00:00
- // long ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24);
- //获取北京时间的第二天00:00:00
- long ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)- 8 * 60 * 60 * 1000;
-
- // long ts = 1677054000000L;
- //如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次
- ctx.timerService().registerProcessingTimeTimer(ts);
- out.collect(value);
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {
- //定时器质性,每天凌晨更新开始时间
- // System.out.println(timestamp);
- System.out.println("定时器执行了:" + timestamp);
- //状态初始化
- timeSate.clear();
- startDate = sd1.parse(sd.format(new Date()));
- System.out.println(startDate);
- // startDate = sd1.parse("2023-02-01");
- }
- });
-
- SingleOutputStreamOperator<Tuple4<String, String, String, Long>> jsonString = process.map(new MapFunction
, Tuple4>() { - @Override
- public Tuple4<String, String, String, Long> map(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {
- return new Tuple4<String, String, String, Long>(
- value._1(),
- JSON.toJSONString(value._2()),
- value._3(),
- value._4()
- );
- }
- });
-
-
- jsonString.print();
- // jsonString.addSink(new EventOmsSuccessSink());
-
-
- env.execute("EventOrderSuccess===>");
-
- }
- }
3、注意:Roaring64Bitmap 去重只适合去重整形情况