• Flink实时仓库-DWD层(流量域)模板代码


    简介

    对于DWD层开发起到简介的作用

    工具类

    时间工具类

    1. public class DateFormatUtil {
    2. private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    3. private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    4. public static Long toTs(String dtStr, boolean isFull) {
    5. LocalDateTime localDateTime = null;
    6. if (!isFull) {
    7. dtStr = dtStr + " 00:00:00";
    8. }
    9. localDateTime = LocalDateTime.parse(dtStr, dtfFull);
    10. return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    11. }
    12. public static Long toTs(String dtStr) {
    13. return toTs(dtStr, false);
    14. }
    15. public static String toDate(Long ts) {
    16. Date dt = new Date(ts);
    17. LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
    18. return dtf.format(localDateTime);
    19. }
    20. public static String toYmdHms(Long ts) {
    21. Date dt = new Date(ts);
    22. LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
    23. return dtfFull.format(localDateTime);
    24. }
    25. public static void main(String[] args) {
    26. System.out.println(toYmdHms(System.currentTimeMillis()));
    27. }
    28. }

    kafka工具类 

    1. public class KafkaUtil {
    2. static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
    3. static String DEFAULT_TOPIC = "default_topic";
    4. /**
    5. * 根据主题还有消费者组得到消费者
    6. * @param topic
    7. * @param groupId
    8. * @return
    9. */
    10. public static FlinkKafkaConsumer getKafkaConsumer(String topic, String groupId) {
    11. Properties prop = new Properties();
    12. prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
    13. prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    14. FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic,
    15. //由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
    16. new KafkaDeserializationSchema() {
    17. @Override
    18. public boolean isEndOfStream(String nextElement) {
    19. return false;
    20. }
    21. @Override
    22. public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    23. if(record == null || record.value() == null) {
    24. return "";
    25. }
    26. return new String(record.value());
    27. }
    28. @Override
    29. public TypeInformation getProducedType() {
    30. return BasicTypeInfo.STRING_TYPE_INFO;
    31. }
    32. }, prop);
    33. return consumer;
    34. }
    35. /**
    36. * 根据主题得到生产者
    37. * @param topic
    38. * @return
    39. */
    40. public static FlinkKafkaProducer getKafkaProducer(String topic) {
    41. Properties prop = new Properties();
    42. prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
    43. prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
    44. FlinkKafkaProducer producer = new FlinkKafkaProducer(DEFAULT_TOPIC, new KafkaSerializationSchema() {
    45. @Override
    46. public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
    47. return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
    48. }
    49. }, prop,
    50. FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    51. return producer;
    52. }
    53. }

    流量域

    前置知识

    键值状态

    1. public class Status {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource initData = env.fromElements("a", "a", "b", "c");
    5. //先装换成键值状态才能够使用状态变量
    6. SingleOutputStreamOperator> mapKeyData = initData.map(new MapFunction>() {
    7. @Override
    8. public Tuple2 map(String dataItem) throws Exception {
    9. return Tuple2.of(dataItem, 1);
    10. }
    11. });
    12. SingleOutputStreamOperator> firstViewDtState = mapKeyData.keyBy(data -> data.f0)
    13. .process(new KeyedProcessFunction, Tuple2>() {
    14. ValueState firstViewDtState;
    15. @Override
    16. public void open(Configuration param) throws Exception {
    17. super.open(param);
    18. firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor(
    19. "firstViewDtState", String.class
    20. ));
    21. }
    22. @Override
    23. public void processElement(Tuple2 value, Context ctx, Collector> out) throws Exception {
    24. String keyStatus = firstViewDtState.value();
    25. if (keyStatus == null) {
    26. //用key的值保存进去
    27. firstViewDtState.update(value.f0);
    28. } else {
    29. System.out.println(value + " 重复来了");
    30. }
    31. }
    32. });
    33. firstViewDtState.print();
    34. env.execute();
    35. }
    36. }

    结果

    (a,1)  重复来了

    侧输出流

    1. public class OutputTagTest {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource initData = env.fromElements("a", "a", "b", "c");
    5. // 1 定义侧输出流
    6. OutputTag a = new OutputTag("a") {
    7. };
    8. OutputTag b = new OutputTag("b") {
    9. };
    10. SingleOutputStreamOperator processData = initData.process(new ProcessFunction() {
    11. @Override
    12. public void processElement(String value, Context ctx, Collector out) throws Exception {
    13. if (value.equals("a")) {
    14. //写到侧输出流a
    15. ctx.output(a, value);
    16. } else if (value.equals("b")) {
    17. //写到侧输出流b
    18. ctx.output(b, value);
    19. } else {
    20. //写出到主流
    21. out.collect(value);
    22. }
    23. }
    24. });
    25. //得到a的侧输出流
    26. processData.getSideOutput(a).print("a>>");
    27. //得到b的侧输出流
    28. processData.getSideOutput(b).print("b>>");
    29. //主流数据输出
    30. processData.print("主流>>");
    31. env.execute();
    32. }
    33. }

    结果

    1. b>>:4> b
    2. a>>:3> a
    3. a>>:2> a
    4. 主流>>:5> c

    日志数据分流

    首先为什么要对于启动日志进行分流?因为可以对于不同的日志类型分别分析,在分析的时候可以减少数据量

    分流实现的程序示例

    1. public class BaseLogApp {
    2. public static void main(String[] args) throws Exception {
    3. // TODO 1. 初始化环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(4);
    6. // TODO 2. 启用状态后端
    7. env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
    8. env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
    9. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
    10. env.getCheckpointConfig().enableExternalizedCheckpoints(
    11. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    12. );
    13. env.setRestartStrategy(RestartStrategies
    14. .failureRateRestart(10,
    15. Time.of(3L, TimeUnit.DAYS),
    16. Time.of(1L, TimeUnit.MINUTES)));
    17. env.setStateBackend(new HashMapStateBackend());
    18. env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
    19. System.setProperty("HADOOP_USER_NAME", "bigdata");
    20. // TODO 3. 从 Kafka 读取主流数据
    21. String topic = "topic_logg";
    22. String groupId = "base_log_consumer";
    23. DataStreamSource source = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
    24. // TODO 4. 数据清洗,转换结构
    25. // 4.1 定义错误侧输出流
    26. OutputTag dirtyStreamTag = new OutputTag("dirtyStream") {
    27. };
    28. SingleOutputStreamOperator cleanedStream = source.process(
    29. new ProcessFunction() {
    30. @Override
    31. public void processElement(String jsonStr, Context ctx, Collector out) throws Exception {
    32. try {
    33. JSONObject jsonObj = JSON.parseObject(jsonStr);
    34. out.collect(jsonStr);
    35. } catch (Exception e) {
    36. ctx.output(dirtyStreamTag, jsonStr);
    37. }
    38. }
    39. }
    40. );
    41. // 4.2 将脏数据写出到 Kafka 指定主题
    42. DataStream dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);
    43. String dirtyTopic = "dirty_data";
    44. dirtyStream.addSink(KafkaUtil.getKafkaProducer(dirtyTopic));
    45. // 4.3 转换主流数据结构 jsonStr -> jsonObj
    46. SingleOutputStreamOperator mappedStream = cleanedStream.map(JSON::parseObject);
    47. // TODO 5. 新老访客状态标记修复
    48. // 5.1 按照 mid 对数据进行分组
    49. KeyedStream keyedStream = mappedStream.keyBy(r -> r.getJSONObject("common").getString("mid"));
    50. // 5.2 新老访客状态标记修复
    51. SingleOutputStreamOperator fixedStream = keyedStream.process(
    52. new KeyedProcessFunction() {
    53. ValueState firstViewDtState;
    54. @Override
    55. public void open(Configuration param) throws Exception {
    56. super.open(param);
    57. firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor(
    58. "lastLoginDt", String.class
    59. ));
    60. }
    61. @Override
    62. public void processElement(JSONObject jsonObj, Context ctx, Collector out) throws Exception {
    63. String isNew = jsonObj.getJSONObject("common").getString("is_new");
    64. String firstViewDt = firstViewDtState.value();
    65. Long ts = jsonObj.getLong("ts");
    66. String dt = DateFormatUtil.toDate(ts);
    67. if ("1".equals(isNew)) {
    68. if (firstViewDt == null) {
    69. firstViewDtState.update(dt);
    70. } else {
    71. if (!firstViewDt.equals(dt)) {
    72. isNew = "0";
    73. jsonObj.getJSONObject("common").put("is_new", isNew);
    74. }
    75. }
    76. } else {
    77. if (firstViewDt == null) {
    78. // 将首次访问日期置为昨日
    79. String yesterday = DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24);
    80. firstViewDtState.update(yesterday);
    81. }
    82. }
    83. out.collect(jsonObj);
    84. }
    85. }
    86. );
    87. // TODO 6. 分流
    88. // 6.1 定义启动、曝光、动作、错误侧输出流
    89. OutputTag startTag = new OutputTag("startTag") {
    90. };
    91. OutputTag displayTag = new OutputTag("displayTag") {
    92. };
    93. OutputTag actionTag = new OutputTag("actionTag") {
    94. };
    95. OutputTag errorTag = new OutputTag("errorTag") {
    96. };
    97. // 6.2 分流
    98. SingleOutputStreamOperator separatedStream = fixedStream.process(
    99. new ProcessFunction() {
    100. @Override
    101. public void processElement(JSONObject jsonObj, Context context, Collector out) throws Exception {
    102. // 6.2.1 收集错误数据
    103. JSONObject error = jsonObj.getJSONObject("err");
    104. if (error != null) {
    105. context.output(errorTag, jsonObj.toJSONString());
    106. }
    107. // 剔除 "err" 字段
    108. jsonObj.remove("err");
    109. // 6.2.2 收集启动数据
    110. JSONObject start = jsonObj.getJSONObject("start");
    111. if (start != null) {
    112. context.output(startTag, jsonObj.toJSONString());
    113. } else {
    114. // 获取 "page" 字段
    115. JSONObject page = jsonObj.getJSONObject("page");
    116. // 获取 "common" 字段
    117. JSONObject common = jsonObj.getJSONObject("common");
    118. // 获取 "ts"
    119. Long ts = jsonObj.getLong("ts");
    120. // 6.2.3 收集曝光数据
    121. JSONArray displays = jsonObj.getJSONArray("displays");
    122. if (displays != null) {
    123. for (int i = 0; i < displays.size(); i++) {
    124. JSONObject display = displays.getJSONObject(i);
    125. JSONObject displayObj = new JSONObject();
    126. displayObj.put("display", display);
    127. displayObj.put("common", common);
    128. displayObj.put("page", page);
    129. displayObj.put("ts", ts);
    130. context.output(displayTag, displayObj.toJSONString());
    131. }
    132. }
    133. // 6.2.4 收集动作数据
    134. JSONArray actions = jsonObj.getJSONArray("actions");
    135. if (actions != null) {
    136. for (int i = 0; i < actions.size(); i++) {
    137. JSONObject action = actions.getJSONObject(i);
    138. JSONObject actionObj = new JSONObject();
    139. actionObj.put("action", action);
    140. actionObj.put("common", common);
    141. actionObj.put("page", page);
    142. actionObj.put("ts", ts);
    143. context.output(actionTag, actionObj.toJSONString());
    144. }
    145. }
    146. // 6.2.5 收集页面数据
    147. jsonObj.remove("displays");
    148. jsonObj.remove("actions");
    149. out.collect(jsonObj.toJSONString());
    150. }
    151. }
    152. }
    153. );
    154. // 打印主流和各侧输出流查看分流效果
    155. separatedStream.print("page>>>");
    156. separatedStream.getSideOutput(startTag).print("start!!!");
    157. separatedStream.getSideOutput(displayTag).print("display@@@");
    158. separatedStream.getSideOutput(actionTag).print("action###");
    159. separatedStream.getSideOutput(errorTag).print("error$$$");
    160. // TODO 7. 将数据输出到 Kafka 的不同主题
    161. // // 7.1 提取各侧输出流
    162. // DataStream startDS = separatedStream.getSideOutput(startTag);
    163. // DataStream displayDS = separatedStream.getSideOutput(displayTag);
    164. // DataStream actionDS = separatedStream.getSideOutput(actionTag);
    165. // DataStream errorDS = separatedStream.getSideOutput(errorTag);
    166. //
    167. // // 7.2 定义不同日志输出到 Kafka 的主题名称
    168. // String page_topic = "dwd_traffic_page_log";
    169. // String start_topic = "dwd_traffic_start_log";
    170. // String display_topic = "dwd_traffic_display_log";
    171. // String action_topic = "dwd_traffic_action_log";
    172. // String error_topic = "dwd_traffic_error_log";
    173. //
    174. // separatedStream.addSink(KafkaUtil.getKafkaProducer(page_topic));
    175. // startDS.addSink(KafkaUtil.getKafkaProducer(start_topic));
    176. // displayDS.addSink(KafkaUtil.getKafkaProducer(display_topic));
    177. // actionDS.addSink(KafkaUtil.getKafkaProducer(action_topic));
    178. // errorDS.addSink(KafkaUtil.getKafkaProducer(error_topic));
    179. env.execute();
    180. }
    181. }

    初始日志数据

    1. {
    2. "actions": [
    3. {
    4. "action_id": "get_coupon",
    5. "item": "3",
    6. "item_type": "coupon_id",
    7. "ts": 1592134620882
    8. }
    9. ],
    10. "common": {
    11. "ar": "110000",
    12. "ba": "Oneplus",
    13. "ch": "oppo",
    14. "is_new": "0",
    15. "md": "Oneplus 7",
    16. "mid": "mid_232163",
    17. "os": "Android 10.0",
    18. "uid": "898",
    19. "vc": "v2.1.134"
    20. },
    21. "displays": [
    22. {
    23. "display_type": "query",
    24. "item": "18",
    25. "item_type": "sku_id",
    26. "order": 1,
    27. "pos_id": 2
    28. },
    29. {
    30. "display_type": "promotion",
    31. "item": "13",
    32. "item_type": "sku_id",
    33. "order": 2,
    34. "pos_id": 4
    35. },
    36. {
    37. "display_type": "query",
    38. "item": "29",
    39. "item_type": "sku_id",
    40. "order": 3,
    41. "pos_id": 2
    42. },
    43. {
    44. "display_type": "query",
    45. "item": "7",
    46. "item_type": "sku_id",
    47. "order": 4,
    48. "pos_id": 4
    49. },
    50. {
    51. "display_type": "query",
    52. "item": "19",
    53. "item_type": "sku_id",
    54. "order": 5,
    55. "pos_id": 4
    56. },
    57. {
    58. "display_type": "promotion",
    59. "item": "22",
    60. "item_type": "sku_id",
    61. "order": 6,
    62. "pos_id": 4
    63. },
    64. {
    65. "display_type": "query",
    66. "item": "25",
    67. "item_type": "sku_id",
    68. "order": 7,
    69. "pos_id": 5
    70. }
    71. ],
    72. "page": {
    73. "during_time": 11764,
    74. "item": "31",
    75. "item_type": "sku_id",
    76. "last_page_id": "good_list",
    77. "page_id": "good_detail",
    78. "source_type": "query"
    79. },
    80. "ts": 1592134615000
    81. }

    分流后数据

    主流(启动日志数据)

    1. {
    2. "common": {
    3. "ar": "310000",
    4. "uid": "780",
    5. "os": "Android 11.0",
    6. "ch": "oppo",
    7. "is_new": "1",
    8. "md": "Huawei P30",
    9. "mid": "mid_619118",
    10. "vc": "v2.1.134",
    11. "ba": "Huawei"
    12. },
    13. "page": {
    14. "page_id": "payment",
    15. "item": "33,25",
    16. "during_time": 5439,
    17. "item_type": "sku_ids",
    18. "last_page_id": "trade"
    19. },
    20. "ts": 1592134614000
    21. }

    曝光日志数据

    1. {
    2. "common": {
    3. "ar": "110000",
    4. "uid": "914",
    5. "os": "iOS 13.3.1",
    6. "ch": "Appstore",
    7. "is_new": "0",
    8. "md": "iPhone Xs Max",
    9. "mid": "mid_319090",
    10. "vc": "v2.1.134",
    11. "ba": "iPhone"
    12. },
    13. "display": {
    14. "display_type": "query",
    15. "item": "8",
    16. "item_type": "sku_id",
    17. "pos_id": 1,
    18. "order": 7
    19. },
    20. "page": {
    21. "page_id": "home",
    22. "during_time": 4328
    23. },
    24. "ts": 1592134610000
    25. }

    行为日志数据

    1. {
    2. "common": {
    3. "ar": "230000",
    4. "uid": "257",
    5. "os": "Android 11.0",
    6. "ch": "vivo",
    7. "is_new": "0",
    8. "md": "Xiaomi 9",
    9. "mid": "mid_227516",
    10. "vc": "v2.0.1",
    11. "ba": "Xiaomi"
    12. },
    13. "action": {
    14. "item": "35",
    15. "action_id": "cart_minus_num",
    16. "item_type": "sku_id",
    17. "ts": 1592134612791
    18. },
    19. "page": {
    20. "page_id": "cart",
    21. "during_time": 3583,
    22. "last_page_id": "good_detail"
    23. },
    24. "ts": 1592134611000
    25. }

    错误日志数据

    1. {
    2. "common": {
    3. "ar": "110000",
    4. "uid": "780",
    5. "os": "Android 11.0",
    6. "ch": "huawei",
    7. "is_new": "1",
    8. "md": "Xiaomi 9",
    9. "mid": "mid_503805",
    10. "vc": "v2.1.134",
    11. "ba": "Xiaomi"
    12. },
    13. "err": {
    14. "msg": " Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)",
    15. "error_code": 1245
    16. },
    17. "page": {
    18. "page_id": "home",
    19. "during_time": 17642
    20. },
    21. "displays": [
    22. {
    23. "display_type": "activity",
    24. "item": "1",
    25. "item_type": "activity_id",
    26. "pos_id": 2,
    27. "order": 1
    28. },
    29. {
    30. "display_type": "query",
    31. "item": "2",
    32. "item_type": "sku_id",
    33. "pos_id": 5,
    34. "order": 2
    35. },
    36. {
    37. "display_type": "query",
    38. "item": "6",
    39. "item_type": "sku_id",
    40. "pos_id": 2,
    41. "order": 3
    42. },
    43. {
    44. "display_type": "query",
    45. "item": "8",
    46. "item_type": "sku_id",
    47. "pos_id": 4,
    48. "order": 4
    49. },
    50. {
    51. "display_type": "query",
    52. "item": "6",
    53. "item_type": "sku_id",
    54. "pos_id": 3,
    55. "order": 5
    56. },
    57. {
    58. "display_type": "promotion",
    59. "item": "29",
    60. "item_type": "sku_id",
    61. "pos_id": 3,
    62. "order": 6
    63. }
    64. ],
    65. "ts": 1592134611000
    66. }

    最终的结果

    独立访客实时表

    前提知识

    状态TTL

    1. public class StateTTl {
    2. public static void main(String[] args) throws Exception {
    3. // 环境准备
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(1);
    6. //初始化数据
    7. DataStreamSource initData = env.fromElements("a", "a", "b", "c", "b");
    8. SingleOutputStreamOperator> targetData = initData.map(new MapFunction>() {
    9. @Override
    10. public Tuple2 map(String value) throws Exception {
    11. if (value.equals("a")) {
    12. Thread.sleep(2000);
    13. }
    14. return Tuple2.of(value,1);
    15. }
    16. });
    17. //测试状态过期
    18. SingleOutputStreamOperator> testTTL = targetData.keyBy(data -> data.f0)
    19. .process(new KeyedProcessFunction, Tuple2>() {
    20. private ValueState lastVisitDt;
    21. @Override
    22. public void open(Configuration paramenters) throws Exception {
    23. super.open(paramenters);
    24. ValueStateDescriptor valueStateDescriptor =
    25. new ValueStateDescriptor<>("testTTL", String.class);
    26. valueStateDescriptor.enableTimeToLive(
    27. StateTtlConfig
    28. .newBuilder(Time.seconds(1L))
    29. // 设置在创建和更新状态时更新存活时间
    30. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    31. .build()
    32. );
    33. //对于keyStatus设置ttl
    34. lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
    35. }
    36. @Override
    37. public void processElement(Tuple2 value, KeyedProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception {
    38. String valueState = lastVisitDt.value();
    39. if (valueState == null) {
    40. System.out.println(value);
    41. lastVisitDt.update(value.f0);
    42. } else {
    43. System.out.println(value.f0 + " 有对应的状态了");
    44. }
    45. }
    46. });
    47. testTTL.print();
    48. env.execute();
    49. }
    50. }

    输出结果

    1. (a,1)
    2. (a,1)
    3. (b,1)
    4. (c,1)
    5. b 有对应的状态了

    结果可以看出第一个a经过一秒以后删除了

    代码实现

    1. public class DwdTrafficUniqueVisitorDetail {
    2. public static void main(String[] args) throws Exception {
    3. // TODO 1. 环境准备
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(4);
    6. // TODO 2. 状态后端设置
    7. env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
    8. env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
    9. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
    10. env.getCheckpointConfig().enableExternalizedCheckpoints(
    11. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    12. );
    13. env.setRestartStrategy(RestartStrategies.failureRateRestart(
    14. 3, Time.days(1), Time.minutes(1)
    15. ));
    16. env.setStateBackend(new HashMapStateBackend());
    17. env.getCheckpointConfig().setCheckpointStorage(
    18. "hdfs://master:8020/ck"
    19. );
    20. System.setProperty("HADOOP_USER_NAME", "bigdata");
    21. // TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
    22. String topic = "dwd_traffic_page_log";
    23. String groupId = "dwd_traffic_user_jump_detail";
    24. FlinkKafkaConsumer kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
    25. DataStreamSource pageLog = env.addSource(kafkaConsumer);
    26. // TODO 4. 转换结构
    27. SingleOutputStreamOperator mappedStream = pageLog.map(JSON::parseObject);
    28. // TODO 5. 过滤 last_page_id 不为 null 的数据
    29. SingleOutputStreamOperator firstPageStream = mappedStream.filter(
    30. jsonObj -> jsonObj
    31. .getJSONObject("page")
    32. .getString("last_page_id") == null
    33. );
    34. // TODO 6. 按照 mid 分组
    35. KeyedStream keyedStream = firstPageStream
    36. .keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
    37. // TODO 7. 通过 Flink 状态编程过滤独立访客记录
    38. SingleOutputStreamOperator filteredStream = keyedStream.filter(
    39. new RichFilterFunction() {
    40. private ValueState lastVisitDt;
    41. @Override
    42. public void open(Configuration paramenters) throws Exception {
    43. super.open(paramenters);
    44. ValueStateDescriptor valueStateDescriptor =
    45. new ValueStateDescriptor<>("last_visit_dt", String.class);
    46. valueStateDescriptor.enableTimeToLive(
    47. StateTtlConfig
    48. .newBuilder(Time.days(1L))
    49. // 设置在创建和更新状态时更新存活时间
    50. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    51. .build()
    52. );
    53. lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
    54. }
    55. @Override
    56. public boolean filter(JSONObject jsonObj) throws Exception {
    57. String visitDt = DateFormatUtil.toDate(jsonObj.getLong("ts"));
    58. String lastDt = lastVisitDt.value();
    59. if (lastDt == null || !lastDt.equals(visitDt)) {
    60. lastVisitDt.update(visitDt);
    61. return true;
    62. }
    63. return false;
    64. }
    65. }
    66. );
    67. // TODO 8. 将独立访客数据写入
    68. // Kafka dwd_traffic_unique_visitor_detail 主题
    69. String targetTopic = "dwd_traffic_unique_visitor_detail";
    70. FlinkKafkaProducer kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
    71. filteredStream.map(JSONAware::toJSONString).addSink(kafkaProducer);
    72. // TODO 9. 启动任务
    73. env.execute();
    74. }
    75. }

    结果

    1. {
    2. "common": {
    3. "ar": "310000",
    4. "uid": "201",
    5. "os": "Android 11.0",
    6. "ch": "vivo",
    7. "is_new": "1",
    8. "md": "Xiaomi Mix2 ",
    9. "mid": "mid_994205",
    10. "vc": "v2.0.1",
    11. "ba": "Xiaomi"
    12. },
    13. "page": {
    14. "page_id": "home",
    15. "during_time": 19868
    16. },
    17. "ts": 1592133292000
    18. }

    跳出事务事实表

    前提知识

    要保证数据是有序的首先使用的是水位线,然后使用窗口的操作才能使数据有序

    FlinkCEP的使用

    1. public class CEPTest {
    2. public static void main(String[] args) throws Exception {
    3. //TODO 1得到执行环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //TODO 注意在多个分区的时候 因为并行度多个的话 ,watermark还是不会被提升的,还是触发不了计算
    6. env.setParallelism(1);
    7. //TODO 2得到网络端口的数据 源
    8. DataStreamSource socketTextStream = env.socketTextStream("master", 9998);
    9. //TODO 3假设我们得到的数据是a 1,第一个是数据第二个是秒
    10. SingleOutputStreamOperator> mappedStream = socketTextStream.map(new MapFunction>() {
    11. @Override
    12. public Tuple2 map(String value) throws Exception {
    13. String[] s = value.split(" ");
    14. return Tuple2.of(s[0], Integer.parseInt(s[1])*1000L);
    15. }
    16. });
    17. //TODO 4为了保证数据的有序就得使用水位线
    18. SingleOutputStreamOperator> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
    19. WatermarkStrategy
    20. //设置延迟0秒
    21. .>forBoundedOutOfOrderness(Duration.ofSeconds(0))
    22. .withTimestampAssigner(
    23. new SerializableTimestampAssigner>() {
    24. @Override
    25. public long extractTimestamp(Tuple2 initData, long recordTimestamp) {
    26. // System.out.println(initData);
    27. return initData.f1;
    28. }
    29. }
    30. )
    31. );
    32. //TODO 5 分组以后对于每一个组进行分组开窗
    33. KeyedStream, String> keyedStream = withWatermarkStream.keyBy(data -> {
    34. return data.f0;
    35. });
    36. // keyedStream.print("keyedStream");
    37. //TODO 6这里主要是使用CEP匹配到自己想要的数据,我们这里想要的是a 和 b在10秒内是连续的,否则就是超时了
    38. Pattern, Tuple2> pattern = Pattern.>begin("first").where(
    39. new SimpleCondition>() {
    40. @Override
    41. public boolean filter(Tuple2 firstData) throws Exception {
    42. return firstData.f0.equals("a");
    43. }
    44. }
    45. ).next("second").where(
    46. new SimpleCondition>() {
    47. @Override
    48. public boolean filter(Tuple2 secondData) throws Exception {
    49. return secondData.f0.equals("a");
    50. }
    51. }
    52. // 上文调用了同名 Time 类,此处需要使用全类名
    53. ).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
    54. PatternStream> patternStream = CEP.pattern(keyedStream, pattern);
    55. //超时侧输出流
    56. OutputTag> timeoutTag = new OutputTag>("timeoutTag") {
    57. };
    58. SingleOutputStreamOperator> flatSelectStream = patternStream.flatSelect(
    59. timeoutTag,
    60. new PatternFlatTimeoutFunction, Tuple2>() {
    61. @Override
    62. public void timeout(Map>> pattern, long timeoutTimestamp, Collector> out) throws Exception {
    63. //得到超时的数据
    64. Tuple2 first = pattern.get("first").get(0);
    65. out.collect(first);
    66. }
    67. },
    68. new PatternFlatSelectFunction, Tuple2>() {
    69. @Override
    70. public void flatSelect(Map>> pattern, Collector> out) throws Exception {
    71. //得到了匹配到的数据
    72. Tuple2 second = pattern.get("second").get(0);
    73. out.collect(second);
    74. }
    75. }
    76. );
    77. DataStream> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
    78. //打印超时的数据
    79. timeOutDStream.print("timeOut");
    80. //打印正常匹配到的数据
    81. flatSelectStream.print("flatSelectStream");
    82. env.execute();
    83. }
    84. }

    测试数据

    1. [bigdata@master createdata]$ nc -lk 9998
    2. a 2
    3. a 3
    4. a 15

    得到结果

    1. flatSelectStream> (a,3000)
    2. timeOut> (a,3000)

    结论在a 2,a 3的时候还没有关闭10秒的窗口,输入15的时候关闭窗口,由于a 2,a 3然后a 3是符合10秒内的数据的所以输出,然后是输入a 15,那么前面的a 3就是超时的数据

    Union

    1. public class UnionTest {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource dataStream1 = env.fromElements(1, 2, 3);
    5. DataStreamSource dataStream2 = env.fromElements(4, 5, 6);
    6. dataStream1.union(dataStream2)
    7. .print();
    8. env.execute();
    9. }
    10. }

    输出

    1. 2> 5
    2. 3> 6
    3. 1> 4
    4. 11> 3
    5. 9> 1
    6. 10> 2

    实战代码

    1. public class DwdTrafficUserJumpDetail {
    2. public static void main(String[] args) throws Exception {
    3. // TODO 1. 环境准备
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(4);
    6. // TODO 2. 状态后端设置
    7. env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
    8. env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
    9. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
    10. env.getCheckpointConfig().enableExternalizedCheckpoints(
    11. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    12. );
    13. env.setRestartStrategy(RestartStrategies.failureRateRestart(
    14. 3, Time.days(1), Time.minutes(1)
    15. ));
    16. env.setStateBackend(new HashMapStateBackend());
    17. env.getCheckpointConfig().setCheckpointStorage(
    18. "hdfs://master:8020/ck"
    19. );
    20. System.setProperty("HADOOP_USER_NAME", "bigdata");
    21. // TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
    22. String topic = "dwd_traffic_page_log";
    23. String groupId = "dwd_traffic_user_jump_detail";
    24. FlinkKafkaConsumer kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
    25. DataStreamSource pageLog = env.addSource(kafkaConsumer);
    26. // 测试数据
    27. /*DataStream kafkaDS = env
    28. .fromElements(
    29. "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
    30. "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
    31. "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
    32. "\"home\"},\"ts\":15000} ",
    33. "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
    34. "\"detail\"},\"ts\":30000} "
    35. );*/
    36. // TODO 4. 转换结构
    37. SingleOutputStreamOperator mappedStream = pageLog.map(JSON::parseObject);
    38. // TODO 5. 设置水位线,用于用户跳出统计
    39. SingleOutputStreamOperator withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
    40. WatermarkStrategy
    41. .forMonotonousTimestamps()
    42. .withTimestampAssigner(
    43. new SerializableTimestampAssigner() {
    44. @Override
    45. public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
    46. return jsonObj.getLong("ts");
    47. }
    48. }
    49. )
    50. );
    51. // TODO 6. 按照 mid 分组
    52. KeyedStream keyedStream = withWatermarkStream.keyBy(jsonOjb -> jsonOjb.getJSONObject("common").getString("mid"));
    53. // TODO 7. 定义 CEP 匹配规则
    54. Pattern pattern = Pattern.begin("first").where(
    55. new SimpleCondition() {
    56. @Override
    57. public boolean filter(JSONObject jsonObj) throws Exception {
    58. String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
    59. return lastPageId == null;
    60. }
    61. }
    62. ).next("second").where(
    63. new SimpleCondition() {
    64. @Override
    65. public boolean filter(JSONObject jsonObj) throws Exception {
    66. String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
    67. return lastPageId == null;
    68. }
    69. }
    70. // 上文调用了同名 Time 类,此处需要使用全类名
    71. ).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
    72. // TODO 8. 把 Pattern 应用到流上
    73. PatternStream patternStream = CEP.pattern(keyedStream, pattern);
    74. // TODO 9. 提取匹配上的事件以及超时事件
    75. OutputTag timeoutTag = new OutputTag("timeoutTag") {
    76. };
    77. SingleOutputStreamOperator flatSelectStream = patternStream.flatSelect(
    78. timeoutTag,
    79. new PatternFlatTimeoutFunction() {
    80. @Override
    81. public void timeout(Map> pattern, long timeoutTimestamp, Collector out) throws Exception {
    82. JSONObject element = pattern.get("first").get(0);
    83. out.collect(element);
    84. }
    85. },
    86. new PatternFlatSelectFunction() {
    87. @Override
    88. public void flatSelect(Map> pattern, Collector out) throws Exception {
    89. JSONObject element = pattern.get("first").get(0);
    90. out.collect(element);
    91. }
    92. }
    93. );
    94. DataStream timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
    95. // TODO 11. 合并两个流并将数据写出到 Kafka
    96. DataStream unionDStream = flatSelectStream.union(timeOutDStream);
    97. String targetTopic = "dwd_traffic_user_jump_detail";
    98. FlinkKafkaProducer kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
    99. unionDStream .map(JSONAware::toJSONString)
    100. .addSink(kafkaProducer);
    101. env.execute();
    102. }
    103. }

  • 相关阅读:
    优化软件系统,解决死锁问题,提升稳定性与性能 redis排队下单
    RDD的分区、依赖关系、机制
    win10下使用composer安装依赖ssl报错处理
    【环境搭建】Apache ZooKeeper 3.8.4 Stable
    Java多线程:CopyOnWriteArrayList 实现原理
    Powershell脚本自动备份dhcp数据库
    【Labivew】G语言
    Xss跨站脚本攻击
    嵌入式FreeRTOS学习十一,深入理解任务调度机制
    NCTF-2019-Crypto部分 复现
  • 原文地址:https://blog.csdn.net/S1124654/article/details/125878285