对于DWD层开发起到简介的作用
- public class DateFormatUtil {
- private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
- public static Long toTs(String dtStr, boolean isFull) {
-
- LocalDateTime localDateTime = null;
- if (!isFull) {
- dtStr = dtStr + " 00:00:00";
- }
- localDateTime = LocalDateTime.parse(dtStr, dtfFull);
-
- return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- }
-
- public static Long toTs(String dtStr) {
- return toTs(dtStr, false);
- }
-
- public static String toDate(Long ts) {
- Date dt = new Date(ts);
- LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
- return dtf.format(localDateTime);
- }
-
- public static String toYmdHms(Long ts) {
- Date dt = new Date(ts);
- LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
- return dtfFull.format(localDateTime);
- }
-
- public static void main(String[] args) {
- System.out.println(toYmdHms(System.currentTimeMillis()));
- }
- }
- public class KafkaUtil {
- static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
- static String DEFAULT_TOPIC = "default_topic";
-
- /**
- * 根据主题还有消费者组得到消费者
- * @param topic
- * @param groupId
- * @return
- */
- public static FlinkKafkaConsumer
getKafkaConsumer(String topic, String groupId) { - Properties prop = new Properties();
- prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
- prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
- FlinkKafkaConsumer
consumer = new FlinkKafkaConsumer<>(topic, - //由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
- new KafkaDeserializationSchema
() { - @Override
- public boolean isEndOfStream(String nextElement) {
- return false;
- }
-
- @Override
- public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
- if(record == null || record.value() == null) {
- return "";
- }
- return new String(record.value());
- }
-
- @Override
- public TypeInformation
getProducedType() { - return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }, prop);
- return consumer;
- }
-
- /**
- * 根据主题得到生产者
- * @param topic
- * @return
- */
- public static FlinkKafkaProducer
getKafkaProducer(String topic) { -
- Properties prop = new Properties();
- prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
- prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
- FlinkKafkaProducer
producer = new FlinkKafkaProducer(DEFAULT_TOPIC, new KafkaSerializationSchema() { -
- @Override
- public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
- return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
- }
- }, prop,
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
- return producer;
- }
- }
- public class Status {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource
initData = env.fromElements("a", "a", "b", "c"); -
- //先装换成键值状态才能够使用状态变量
- SingleOutputStreamOperator
> mapKeyData = initData.map(new MapFunction>() { - @Override
- public Tuple2
map(String dataItem) throws Exception { - return Tuple2.of(dataItem, 1);
- }
- });
-
- SingleOutputStreamOperator
> firstViewDtState = mapKeyData.keyBy(data -> data.f0) - .process(new KeyedProcessFunction
, Tuple2>() { - ValueState
firstViewDtState; -
- @Override
- public void open(Configuration param) throws Exception {
- super.open(param);
- firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor
( - "firstViewDtState", String.class
- ));
- }
-
-
- @Override
- public void processElement(Tuple2
value, Context ctx, Collector> out) throws Exception { - String keyStatus = firstViewDtState.value();
- if (keyStatus == null) {
- //用key的值保存进去
- firstViewDtState.update(value.f0);
- } else {
- System.out.println(value + " 重复来了");
- }
- }
- });
-
- firstViewDtState.print();
-
- env.execute();
- }
- }
结果
(a,1) 重复来了
- public class OutputTagTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource
initData = env.fromElements("a", "a", "b", "c"); -
- // 1 定义侧输出流
- OutputTag
a = new OutputTag("a") { - };
- OutputTag
b = new OutputTag("b") { - };
-
- SingleOutputStreamOperator
processData = initData.process(new ProcessFunction() { - @Override
- public void processElement(String value, Context ctx, Collector
out) throws Exception { - if (value.equals("a")) {
- //写到侧输出流a
- ctx.output(a, value);
- } else if (value.equals("b")) {
- //写到侧输出流b
- ctx.output(b, value);
- } else {
- //写出到主流
- out.collect(value);
- }
- }
- });
-
- //得到a的侧输出流
- processData.getSideOutput(a).print("a>>");
- //得到b的侧输出流
- processData.getSideOutput(b).print("b>>");
- //主流数据输出
- processData.print("主流>>");
-
- env.execute();
- }
- }
结果
- b>>:4> b
- a>>:3> a
- a>>:2> a
- 主流>>:5> c
首先为什么要对于启动日志进行分流?因为可以对于不同的日志类型分别分析,在分析的时候可以减少数据量
- public class BaseLogApp {
- public static void main(String[] args) throws Exception {
-
- // TODO 1. 初始化环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // TODO 2. 启用状态后端
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- );
- env.setRestartStrategy(RestartStrategies
- .failureRateRestart(10,
- Time.of(3L, TimeUnit.DAYS),
- Time.of(1L, TimeUnit.MINUTES)));
- env.setStateBackend(new HashMapStateBackend());
- env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
- System.setProperty("HADOOP_USER_NAME", "bigdata");
-
- // TODO 3. 从 Kafka 读取主流数据
- String topic = "topic_logg";
- String groupId = "base_log_consumer";
- DataStreamSource
source = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId)); -
- // TODO 4. 数据清洗,转换结构
- // 4.1 定义错误侧输出流
- OutputTag
dirtyStreamTag = new OutputTag("dirtyStream") { - };
- SingleOutputStreamOperator
cleanedStream = source.process( - new ProcessFunction
() { - @Override
- public void processElement(String jsonStr, Context ctx, Collector
out) throws Exception { - try {
- JSONObject jsonObj = JSON.parseObject(jsonStr);
- out.collect(jsonStr);
- } catch (Exception e) {
- ctx.output(dirtyStreamTag, jsonStr);
- }
- }
- }
- );
-
- // 4.2 将脏数据写出到 Kafka 指定主题
- DataStream
dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag); - String dirtyTopic = "dirty_data";
- dirtyStream.addSink(KafkaUtil.getKafkaProducer(dirtyTopic));
-
- // 4.3 转换主流数据结构 jsonStr -> jsonObj
- SingleOutputStreamOperator
mappedStream = cleanedStream.map(JSON::parseObject); -
- // TODO 5. 新老访客状态标记修复
- // 5.1 按照 mid 对数据进行分组
- KeyedStream
keyedStream = mappedStream.keyBy(r -> r.getJSONObject("common").getString("mid")); -
- // 5.2 新老访客状态标记修复
- SingleOutputStreamOperator
fixedStream = keyedStream.process( - new KeyedProcessFunction
() { -
- ValueState
firstViewDtState; -
- @Override
- public void open(Configuration param) throws Exception {
- super.open(param);
- firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor
( - "lastLoginDt", String.class
- ));
- }
-
- @Override
- public void processElement(JSONObject jsonObj, Context ctx, Collector
out) throws Exception { - String isNew = jsonObj.getJSONObject("common").getString("is_new");
- String firstViewDt = firstViewDtState.value();
- Long ts = jsonObj.getLong("ts");
- String dt = DateFormatUtil.toDate(ts);
-
- if ("1".equals(isNew)) {
- if (firstViewDt == null) {
- firstViewDtState.update(dt);
- } else {
- if (!firstViewDt.equals(dt)) {
- isNew = "0";
- jsonObj.getJSONObject("common").put("is_new", isNew);
- }
- }
- } else {
- if (firstViewDt == null) {
- // 将首次访问日期置为昨日
- String yesterday = DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24);
- firstViewDtState.update(yesterday);
- }
- }
-
- out.collect(jsonObj);
- }
- }
- );
-
- // TODO 6. 分流
- // 6.1 定义启动、曝光、动作、错误侧输出流
- OutputTag
startTag = new OutputTag("startTag") { - };
- OutputTag
displayTag = new OutputTag("displayTag") { - };
- OutputTag
actionTag = new OutputTag("actionTag") { - };
- OutputTag
errorTag = new OutputTag("errorTag") { - };
-
- // 6.2 分流
- SingleOutputStreamOperator
separatedStream = fixedStream.process( - new ProcessFunction
() { - @Override
- public void processElement(JSONObject jsonObj, Context context, Collector
out) throws Exception { -
- // 6.2.1 收集错误数据
- JSONObject error = jsonObj.getJSONObject("err");
- if (error != null) {
- context.output(errorTag, jsonObj.toJSONString());
- }
-
- // 剔除 "err" 字段
- jsonObj.remove("err");
-
- // 6.2.2 收集启动数据
- JSONObject start = jsonObj.getJSONObject("start");
- if (start != null) {
- context.output(startTag, jsonObj.toJSONString());
- } else {
- // 获取 "page" 字段
- JSONObject page = jsonObj.getJSONObject("page");
- // 获取 "common" 字段
- JSONObject common = jsonObj.getJSONObject("common");
- // 获取 "ts"
- Long ts = jsonObj.getLong("ts");
-
- // 6.2.3 收集曝光数据
- JSONArray displays = jsonObj.getJSONArray("displays");
- if (displays != null) {
- for (int i = 0; i < displays.size(); i++) {
- JSONObject display = displays.getJSONObject(i);
- JSONObject displayObj = new JSONObject();
- displayObj.put("display", display);
- displayObj.put("common", common);
- displayObj.put("page", page);
- displayObj.put("ts", ts);
- context.output(displayTag, displayObj.toJSONString());
- }
- }
-
- // 6.2.4 收集动作数据
- JSONArray actions = jsonObj.getJSONArray("actions");
- if (actions != null) {
- for (int i = 0; i < actions.size(); i++) {
- JSONObject action = actions.getJSONObject(i);
- JSONObject actionObj = new JSONObject();
- actionObj.put("action", action);
- actionObj.put("common", common);
- actionObj.put("page", page);
- actionObj.put("ts", ts);
- context.output(actionTag, actionObj.toJSONString());
- }
- }
-
- // 6.2.5 收集页面数据
- jsonObj.remove("displays");
- jsonObj.remove("actions");
- out.collect(jsonObj.toJSONString());
- }
-
- }
- }
- );
-
- // 打印主流和各侧输出流查看分流效果
- separatedStream.print("page>>>");
- separatedStream.getSideOutput(startTag).print("start!!!");
- separatedStream.getSideOutput(displayTag).print("display@@@");
- separatedStream.getSideOutput(actionTag).print("action###");
- separatedStream.getSideOutput(errorTag).print("error$$$");
-
- // TODO 7. 将数据输出到 Kafka 的不同主题
- // // 7.1 提取各侧输出流
- // DataStream
startDS = separatedStream.getSideOutput(startTag); - // DataStream
displayDS = separatedStream.getSideOutput(displayTag); - // DataStream
actionDS = separatedStream.getSideOutput(actionTag); - // DataStream
errorDS = separatedStream.getSideOutput(errorTag); - //
- // // 7.2 定义不同日志输出到 Kafka 的主题名称
- // String page_topic = "dwd_traffic_page_log";
- // String start_topic = "dwd_traffic_start_log";
- // String display_topic = "dwd_traffic_display_log";
- // String action_topic = "dwd_traffic_action_log";
- // String error_topic = "dwd_traffic_error_log";
- //
- // separatedStream.addSink(KafkaUtil.getKafkaProducer(page_topic));
- // startDS.addSink(KafkaUtil.getKafkaProducer(start_topic));
- // displayDS.addSink(KafkaUtil.getKafkaProducer(display_topic));
- // actionDS.addSink(KafkaUtil.getKafkaProducer(action_topic));
- // errorDS.addSink(KafkaUtil.getKafkaProducer(error_topic));
-
- env.execute();
- }
- }
- {
- "actions": [
- {
- "action_id": "get_coupon",
- "item": "3",
- "item_type": "coupon_id",
- "ts": 1592134620882
- }
- ],
- "common": {
- "ar": "110000",
- "ba": "Oneplus",
- "ch": "oppo",
- "is_new": "0",
- "md": "Oneplus 7",
- "mid": "mid_232163",
- "os": "Android 10.0",
- "uid": "898",
- "vc": "v2.1.134"
- },
- "displays": [
- {
- "display_type": "query",
- "item": "18",
- "item_type": "sku_id",
- "order": 1,
- "pos_id": 2
- },
- {
- "display_type": "promotion",
- "item": "13",
- "item_type": "sku_id",
- "order": 2,
- "pos_id": 4
- },
- {
- "display_type": "query",
- "item": "29",
- "item_type": "sku_id",
- "order": 3,
- "pos_id": 2
- },
- {
- "display_type": "query",
- "item": "7",
- "item_type": "sku_id",
- "order": 4,
- "pos_id": 4
- },
- {
- "display_type": "query",
- "item": "19",
- "item_type": "sku_id",
- "order": 5,
- "pos_id": 4
- },
- {
- "display_type": "promotion",
- "item": "22",
- "item_type": "sku_id",
- "order": 6,
- "pos_id": 4
- },
- {
- "display_type": "query",
- "item": "25",
- "item_type": "sku_id",
- "order": 7,
- "pos_id": 5
- }
- ],
- "page": {
- "during_time": 11764,
- "item": "31",
- "item_type": "sku_id",
- "last_page_id": "good_list",
- "page_id": "good_detail",
- "source_type": "query"
- },
- "ts": 1592134615000
- }
主流(启动日志数据)
- {
- "common": {
- "ar": "310000",
- "uid": "780",
- "os": "Android 11.0",
- "ch": "oppo",
- "is_new": "1",
- "md": "Huawei P30",
- "mid": "mid_619118",
- "vc": "v2.1.134",
- "ba": "Huawei"
- },
- "page": {
- "page_id": "payment",
- "item": "33,25",
- "during_time": 5439,
- "item_type": "sku_ids",
- "last_page_id": "trade"
- },
- "ts": 1592134614000
- }
- {
- "common": {
- "ar": "110000",
- "uid": "914",
- "os": "iOS 13.3.1",
- "ch": "Appstore",
- "is_new": "0",
- "md": "iPhone Xs Max",
- "mid": "mid_319090",
- "vc": "v2.1.134",
- "ba": "iPhone"
- },
- "display": {
- "display_type": "query",
- "item": "8",
- "item_type": "sku_id",
- "pos_id": 1,
- "order": 7
- },
- "page": {
- "page_id": "home",
- "during_time": 4328
- },
- "ts": 1592134610000
- }
- {
- "common": {
- "ar": "230000",
- "uid": "257",
- "os": "Android 11.0",
- "ch": "vivo",
- "is_new": "0",
- "md": "Xiaomi 9",
- "mid": "mid_227516",
- "vc": "v2.0.1",
- "ba": "Xiaomi"
- },
- "action": {
- "item": "35",
- "action_id": "cart_minus_num",
- "item_type": "sku_id",
- "ts": 1592134612791
- },
- "page": {
- "page_id": "cart",
- "during_time": 3583,
- "last_page_id": "good_detail"
- },
- "ts": 1592134611000
- }
- {
- "common": {
- "ar": "110000",
- "uid": "780",
- "os": "Android 11.0",
- "ch": "huawei",
- "is_new": "1",
- "md": "Xiaomi 9",
- "mid": "mid_503805",
- "vc": "v2.1.134",
- "ba": "Xiaomi"
- },
- "err": {
- "msg": " Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)",
- "error_code": 1245
- },
- "page": {
- "page_id": "home",
- "during_time": 17642
- },
- "displays": [
- {
- "display_type": "activity",
- "item": "1",
- "item_type": "activity_id",
- "pos_id": 2,
- "order": 1
- },
- {
- "display_type": "query",
- "item": "2",
- "item_type": "sku_id",
- "pos_id": 5,
- "order": 2
- },
- {
- "display_type": "query",
- "item": "6",
- "item_type": "sku_id",
- "pos_id": 2,
- "order": 3
- },
- {
- "display_type": "query",
- "item": "8",
- "item_type": "sku_id",
- "pos_id": 4,
- "order": 4
- },
- {
- "display_type": "query",
- "item": "6",
- "item_type": "sku_id",
- "pos_id": 3,
- "order": 5
- },
- {
- "display_type": "promotion",
- "item": "29",
- "item_type": "sku_id",
- "pos_id": 3,
- "order": 6
- }
- ],
- "ts": 1592134611000
- }

- public class StateTTl {
- public static void main(String[] args) throws Exception {
- // 环境准备
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- //初始化数据
- DataStreamSource
initData = env.fromElements("a", "a", "b", "c", "b"); -
- SingleOutputStreamOperator
> targetData = initData.map(new MapFunction>() { - @Override
- public Tuple2
map(String value) throws Exception { - if (value.equals("a")) {
- Thread.sleep(2000);
- }
- return Tuple2.of(value,1);
- }
- });
-
- //测试状态过期
- SingleOutputStreamOperator
> testTTL = targetData.keyBy(data -> data.f0) - .process(new KeyedProcessFunction
, Tuple2>() { - private ValueState
lastVisitDt; -
- @Override
- public void open(Configuration paramenters) throws Exception {
- super.open(paramenters);
- ValueStateDescriptor
valueStateDescriptor = - new ValueStateDescriptor<>("testTTL", String.class);
- valueStateDescriptor.enableTimeToLive(
- StateTtlConfig
- .newBuilder(Time.seconds(1L))
- // 设置在创建和更新状态时更新存活时间
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build()
- );
- //对于keyStatus设置ttl
- lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
- }
-
- @Override
- public void processElement(Tuple2
value, KeyedProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception { - String valueState = lastVisitDt.value();
- if (valueState == null) {
- System.out.println(value);
- lastVisitDt.update(value.f0);
- } else {
- System.out.println(value.f0 + " 有对应的状态了");
- }
- }
- });
-
- testTTL.print();
-
-
- env.execute();
- }
- }
输出结果
- (a,1)
- (a,1)
- (b,1)
- (c,1)
- b 有对应的状态了
结果可以看出第一个a经过一秒以后删除了
- public class DwdTrafficUniqueVisitorDetail {
- public static void main(String[] args) throws Exception {
-
- // TODO 1. 环境准备
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // TODO 2. 状态后端设置
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- );
- env.setRestartStrategy(RestartStrategies.failureRateRestart(
- 3, Time.days(1), Time.minutes(1)
- ));
- env.setStateBackend(new HashMapStateBackend());
- env.getCheckpointConfig().setCheckpointStorage(
- "hdfs://master:8020/ck"
- );
- System.setProperty("HADOOP_USER_NAME", "bigdata");
-
- // TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
- String topic = "dwd_traffic_page_log";
- String groupId = "dwd_traffic_user_jump_detail";
- FlinkKafkaConsumer
kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId); - DataStreamSource
pageLog = env.addSource(kafkaConsumer); -
- // TODO 4. 转换结构
- SingleOutputStreamOperator
mappedStream = pageLog.map(JSON::parseObject); -
- // TODO 5. 过滤 last_page_id 不为 null 的数据
- SingleOutputStreamOperator
firstPageStream = mappedStream.filter( - jsonObj -> jsonObj
- .getJSONObject("page")
- .getString("last_page_id") == null
- );
-
- // TODO 6. 按照 mid 分组
- KeyedStream
keyedStream = firstPageStream - .keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
-
- // TODO 7. 通过 Flink 状态编程过滤独立访客记录
- SingleOutputStreamOperator
filteredStream = keyedStream.filter( - new RichFilterFunction
() { -
- private ValueState
lastVisitDt; -
- @Override
- public void open(Configuration paramenters) throws Exception {
- super.open(paramenters);
- ValueStateDescriptor
valueStateDescriptor = - new ValueStateDescriptor<>("last_visit_dt", String.class);
- valueStateDescriptor.enableTimeToLive(
- StateTtlConfig
- .newBuilder(Time.days(1L))
- // 设置在创建和更新状态时更新存活时间
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build()
- );
- lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
- }
-
- @Override
- public boolean filter(JSONObject jsonObj) throws Exception {
- String visitDt = DateFormatUtil.toDate(jsonObj.getLong("ts"));
- String lastDt = lastVisitDt.value();
- if (lastDt == null || !lastDt.equals(visitDt)) {
- lastVisitDt.update(visitDt);
- return true;
- }
- return false;
- }
- }
- );
-
- // TODO 8. 将独立访客数据写入
- // Kafka dwd_traffic_unique_visitor_detail 主题
- String targetTopic = "dwd_traffic_unique_visitor_detail";
- FlinkKafkaProducer
kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic); - filteredStream.map(JSONAware::toJSONString).addSink(kafkaProducer);
-
- // TODO 9. 启动任务
- env.execute();
- }
- }
结果
- {
- "common": {
- "ar": "310000",
- "uid": "201",
- "os": "Android 11.0",
- "ch": "vivo",
- "is_new": "1",
- "md": "Xiaomi Mix2 ",
- "mid": "mid_994205",
- "vc": "v2.0.1",
- "ba": "Xiaomi"
- },
- "page": {
- "page_id": "home",
- "during_time": 19868
- },
- "ts": 1592133292000
- }
要保证数据是有序的首先使用的是水位线,然后使用窗口的操作才能使数据有序
- public class CEPTest {
- public static void main(String[] args) throws Exception {
- //TODO 1得到执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //TODO 注意在多个分区的时候 因为并行度多个的话 ,watermark还是不会被提升的,还是触发不了计算
- env.setParallelism(1);
- //TODO 2得到网络端口的数据 源
- DataStreamSource
socketTextStream = env.socketTextStream("master", 9998); - //TODO 3假设我们得到的数据是a 1,第一个是数据第二个是秒
- SingleOutputStreamOperator
> mappedStream = socketTextStream.map(new MapFunction>() { - @Override
- public Tuple2
map(String value) throws Exception { - String[] s = value.split(" ");
- return Tuple2.of(s[0], Integer.parseInt(s[1])*1000L);
- }
- });
-
- //TODO 4为了保证数据的有序就得使用水位线
- SingleOutputStreamOperator
> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks( - WatermarkStrategy
- //设置延迟0秒
- .
>forBoundedOutOfOrderness(Duration.ofSeconds(0)) - .withTimestampAssigner(
- new SerializableTimestampAssigner
>() { - @Override
- public long extractTimestamp(Tuple2
initData, long recordTimestamp) { - // System.out.println(initData);
- return initData.f1;
- }
- }
- )
- );
-
- //TODO 5 分组以后对于每一个组进行分组开窗
- KeyedStream
, String> keyedStream = withWatermarkStream.keyBy(data -> { - return data.f0;
- });
-
- // keyedStream.print("keyedStream");
-
- //TODO 6这里主要是使用CEP匹配到自己想要的数据,我们这里想要的是a 和 b在10秒内是连续的,否则就是超时了
- Pattern
, Tuple2> pattern = Pattern.>begin("first").where( - new SimpleCondition
>() { -
- @Override
- public boolean filter(Tuple2
firstData) throws Exception { - return firstData.f0.equals("a");
- }
- }
- ).next("second").where(
- new SimpleCondition
>() { - @Override
- public boolean filter(Tuple2
secondData) throws Exception { - return secondData.f0.equals("a");
- }
- }
- // 上文调用了同名 Time 类,此处需要使用全类名
- ).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
-
- PatternStream
> patternStream = CEP.pattern(keyedStream, pattern); -
- //超时侧输出流
- OutputTag
> timeoutTag = new OutputTag>("timeoutTag") { - };
- SingleOutputStreamOperator
> flatSelectStream = patternStream.flatSelect( - timeoutTag,
- new PatternFlatTimeoutFunction
, Tuple2>() { - @Override
- public void timeout(Map
>> pattern, long timeoutTimestamp, Collector> out) throws Exception { - //得到超时的数据
- Tuple2
first = pattern.get("first").get(0); - out.collect(first);
- }
- },
- new PatternFlatSelectFunction
, Tuple2>() { - @Override
- public void flatSelect(Map
>> pattern, Collector> out) throws Exception { - //得到了匹配到的数据
- Tuple2
second = pattern.get("second").get(0); - out.collect(second);
- }
- }
- );
-
- DataStream
> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag); - //打印超时的数据
- timeOutDStream.print("timeOut");
- //打印正常匹配到的数据
- flatSelectStream.print("flatSelectStream");
-
- env.execute();
- }
- }
测试数据
- [bigdata@master createdata]$ nc -lk 9998
- a 2
- a 3
- a 15
得到结果
- flatSelectStream> (a,3000)
- timeOut> (a,3000)
结论在a 2,a 3的时候还没有关闭10秒的窗口,输入15的时候关闭窗口,由于a 2,a 3然后a 3是符合10秒内的数据的所以输出,然后是输入a 15,那么前面的a 3就是超时的数据
- public class UnionTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource
dataStream1 = env.fromElements(1, 2, 3); - DataStreamSource
dataStream2 = env.fromElements(4, 5, 6); -
- dataStream1.union(dataStream2)
- .print();
- env.execute();
- }
- }
输出
- 2> 5
- 3> 6
- 1> 4
- 11> 3
- 9> 1
- 10> 2
- public class DwdTrafficUserJumpDetail {
- public static void main(String[] args) throws Exception {
-
- // TODO 1. 环境准备
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // TODO 2. 状态后端设置
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- );
- env.setRestartStrategy(RestartStrategies.failureRateRestart(
- 3, Time.days(1), Time.minutes(1)
- ));
- env.setStateBackend(new HashMapStateBackend());
- env.getCheckpointConfig().setCheckpointStorage(
- "hdfs://master:8020/ck"
- );
- System.setProperty("HADOOP_USER_NAME", "bigdata");
-
- // TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
- String topic = "dwd_traffic_page_log";
- String groupId = "dwd_traffic_user_jump_detail";
- FlinkKafkaConsumer
kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId); - DataStreamSource
pageLog = env.addSource(kafkaConsumer); -
- // 测试数据
- /*DataStream
kafkaDS = env - .fromElements(
- "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
- "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
- "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
- "\"home\"},\"ts\":15000} ",
- "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
- "\"detail\"},\"ts\":30000} "
- );*/
-
- // TODO 4. 转换结构
- SingleOutputStreamOperator
mappedStream = pageLog.map(JSON::parseObject); -
- // TODO 5. 设置水位线,用于用户跳出统计
- SingleOutputStreamOperator
withWatermarkStream = mappedStream.assignTimestampsAndWatermarks( - WatermarkStrategy
- .
forMonotonousTimestamps() - .withTimestampAssigner(
- new SerializableTimestampAssigner
() { - @Override
- public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
- return jsonObj.getLong("ts");
- }
- }
- )
- );
-
- // TODO 6. 按照 mid 分组
- KeyedStream
keyedStream = withWatermarkStream.keyBy(jsonOjb -> jsonOjb.getJSONObject("common").getString("mid")); -
- // TODO 7. 定义 CEP 匹配规则
- Pattern
pattern = Pattern.begin("first").where( - new SimpleCondition
() { -
- @Override
- public boolean filter(JSONObject jsonObj) throws Exception {
- String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
- return lastPageId == null;
- }
- }
- ).next("second").where(
- new SimpleCondition
() { - @Override
- public boolean filter(JSONObject jsonObj) throws Exception {
- String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
- return lastPageId == null;
- }
- }
- // 上文调用了同名 Time 类,此处需要使用全类名
- ).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
-
- // TODO 8. 把 Pattern 应用到流上
- PatternStream
patternStream = CEP.pattern(keyedStream, pattern); -
- // TODO 9. 提取匹配上的事件以及超时事件
- OutputTag
timeoutTag = new OutputTag("timeoutTag") { - };
- SingleOutputStreamOperator
flatSelectStream = patternStream.flatSelect( - timeoutTag,
- new PatternFlatTimeoutFunction
() { - @Override
- public void timeout(Map
> pattern, long timeoutTimestamp, Collector out) throws Exception { - JSONObject element = pattern.get("first").get(0);
- out.collect(element);
-
- }
- },
- new PatternFlatSelectFunction
() { - @Override
- public void flatSelect(Map
> pattern, Collector out) throws Exception { - JSONObject element = pattern.get("first").get(0);
- out.collect(element);
- }
- }
- );
-
- DataStream
timeOutDStream = flatSelectStream.getSideOutput(timeoutTag); -
- // TODO 11. 合并两个流并将数据写出到 Kafka
- DataStream
unionDStream = flatSelectStream.union(timeOutDStream); - String targetTopic = "dwd_traffic_user_jump_detail";
- FlinkKafkaProducer
kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic); - unionDStream .map(JSONAware::toJSONString)
- .addSink(kafkaProducer);
-
- env.execute();
- }
- }