• 《Flink学习笔记》——第十二章 Flink CEP


    12.1 基本概念

    12.1.1 CEP是什么

    1.什么是CEP?

    答:所谓 CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。

    2.那到底什么是“复杂事件处理”呢?

    答:就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者“订单支付超时”等等。具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。

    3.CEP的目的是什么?

    答:就是在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征

    CEP的流程可以分成三个步骤:

    (1)定义一个匹配规则

    (2)将匹配规则应用到事件流上,检测满足规则的复杂事件

    (3)对检测到的复杂事件进行处理,得到结果进行输出

    示例:

    ​ 输入是不同形状的事件流,我们可以定义一个匹配规则:在圆形后面紧跟着三角形。那么将这个规则应用到输入流上,就可以检测到三组匹配的复杂事件。它们构成了一个新的“复杂事件流”,流中的数据就变成了一组一组的复杂事件,每个数据都包含了一个圆形和一个三角形。接下来,我们就可以针对检测到的复杂事件,处理之后输出一个提示或报警信息了。

    image-20230408182527254

    12.1.2 模式(Pattern)

    CEP定义的匹配规则,我们把它叫做模式。

    模式的定义主要有两部分:

    • 每个简单事件的特征
    • 简单事件之间的组合关系

    当然,我们也可以进一步扩展模式的功能。比如,匹配检测的时间限制;每个简单事件是否可以重复出现;对于事件可重复出现的模式,遇到一个匹配后是否跳过后面的匹配;等等。

    所谓“事件之间的组合关系”,一般就是定义“谁后面接着是谁”,也就是事件发生的顺序。我们把它叫作“近邻关系”。可以定义严格的近邻关系,也就是两个事件之前不能有任何其他事件;也可以定义宽松的近邻关系,即只要前后顺序正确即可,中间可以有其他事件。另外, 还可以反向定义,也就是“谁后面不能跟着谁”。

    CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件;模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时(timeout)。

    12.1.3 应用场景

    CEP主要用于实时流数据的分析处理

    • 风险控制

      设定一些行为模式,可以对用户的异常行为实时检测

    • 用户画像

      精准营销,如客户买了什么那大概率还会买什么,和精准推荐相似

    • 运维监控

      对于企业服务的运维管理,可以利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。

    12.2 快速上手

    12.2.1 引入依赖
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-cepartifactId>
        <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    12.2.2 一个简单实例

    需求:检测用户行为,如果连续三次登录失败,就输出报警信息。很显然,这是一个复杂事件的检测处理,我们可以使用 Flink CEP 来实现

    定义一个登录事件POJO类

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class LoginEvent {
        private String userId;
        private String ipAddress;
        private String eventType;
        private Long timestamp;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    主函数

    public class LoginFailDetect {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            KeyedStream<LoginEvent, String> keyedStream = env.fromElements(
                    new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                    new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                    new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
                    new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
                    new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
                    new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
                    new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forMonotonousTimestamps().withTimestampAssigner(
                    (SerializableTimestampAssigner<LoginEvent>) (loginEvent, l) -> loginEvent.getTimestamp()
            )).keyBy(LoginEvent::getUserId);
    
            // 1. 定义一个模式,连续三次登录失败
            Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) {
                    return "fail".equals(loginEvent.getEventType());
                }
            }).next("second").where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) {
                    return "fail".equals(loginEvent.getEventType());
                }
            }).next("third").where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) {
                    return "fail".equals(loginEvent.getEventType());
                }
            });
    
            // 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
            PatternStream<LoginEvent> patternStream = CEP.pattern(keyedStream, pattern);
    
            // 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
            patternStream.select(
                    (PatternSelectFunction<LoginEvent, String>) map -> {
                        LoginEvent first = map.get("first").get(0);
                        LoginEvent second = map.get("second").get(0);
                        LoginEvent third = map.get("third").get(0);
                        return first.getUserId() + " 连续三次登录失败!登录时间:" +
                                first.getTimestamp() + ", " + second.getTimestamp() + ", " + third.getTimestamp();
                    }
            ).print();
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    输出结果:

    user_1 连续三次登录失败!登录时间:2000, 3000, 5000
    
    • 1

    12.3 模式API

    Flink CEP 的核心是复杂事件的模式匹配。Flink CEP 库中提供了 Pattern 类,基于它可以调用一系列方法来定义匹配模式,这就是所谓的模式 API(Pattern API)。Pattern API 可以让我们定义各种复杂的事件组合规则,用于从事件流中提取复杂事件

    12.3.1 个体模式

    模式就是由一组简单的事件的匹配规则组成,单个事件的匹配规则叫做个体模式。如上面的每一个登录失败事件都是个体模式。

    1、基本形式

    一般由一个连接词begin、next开始,然后where定义事件特征/匹配规则,并且个体模式通过量词和条件也能接收多个事件。

    .begin
    .where
    .next
    .where
    
    • 1
    • 2
    • 3
    • 4
    2、量词

    个体模式后面可以跟一个“量词”,用来指定循环的次数。从这个角度分类,个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”。默认情况下,个体模式是单例模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。

    在循环模式中,对同样特征的事件可以匹配多次。比如我们定义个体模式为“匹配形状为三角形的事件”,再让它循环多次,就变成了“匹配连续多个三角形的事件”。注意这里的“连续”,只要保证前后顺序即可,中间可以有其他事件,所以是“宽松近邻”关系。

    在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:

    • .oneOrMore()
    • .times(times)
    • .times(fromTimes,toTimes)
    • .greedy()——贪心模式,尽可能多个匹配
    • .optional()——使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足
    // 匹配事件出现 4 次
    pattern.times(4);
    
    // 匹配事件出现 4 次,或者不出现
    pattern.times(4).optional();
    
    // 匹配事件出现 2, 3 或者 4 次
    pattern.times(2, 4);
    
    // 匹配事件出现 2, 3 或者 4 次,并且尽可能多地匹配
    pattern.times(2, 4).greedy();
    
    // 匹配事件出现 2, 3, 4 次,或者不出现
    pattern.times(2, 4).optional();
    
    // 匹配事件出现 2, 3, 4 次,或者不出现;并且尽可能多地匹配
    pattern.times(2, 4).optional().greedy();
    
    // 匹配事件出现 1 次或多次
    pattern.oneOrMore();
    
    // 匹配事件出现 1 次或多次,并且尽可能多地匹配
    pattern.oneOrMore().greedy();
    
    // 匹配事件出现 1 次或多次,或者不出现
    pattern.oneOrMore().optional();
    
    // 匹配事件出现 1 次或多次,或者不出现;并且尽可能多地匹配
    pattern.oneOrMore().optional().greedy();
    
    // 匹配事件出现 2 次或多次
    pattern.timesOrMore(2);
    
    // 匹配事件出现 2 次或多次,并且尽可能多地匹配
    pattern.timesOrMore(2).greedy();
    
    // 匹配事件出现 2 次或多次,或者不出现
    pattern.timesOrMore(2).optional()
    
    // 匹配事件出现 2 次或多次,或者不出现;并且尽可能多地匹配
    pattern.timesOrMore(2).optional().greedy();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    3、条件

    对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则

    有以下几种条件类型:

    • 限定子类型:当流中的数据类型为事件类型的子类型时满足条件
    • 简单条件:只根据当前事件的特征来决定是否满足条件
    • 迭代条件:简单条件只能依靠当前事件做判断,能够处理的逻辑有限。迭代条件可以依靠之前的事件做判断。
    • 组合条件:可以通过where后面接一个where或者where后面接or等等,实现多个条件的组合。(其实在一个where里面也可以使用多个if-else来判断,但是代码臃肿可读性差)
    • 终止条件:终止循环模式。注意:一般只与 oneOrMore() 或者 oneOrMore().optional()结合使用。因为只有这两个是可以匹配无穷尽的,所以要有终止条件。
    12.3.2 组合模式

    就是多个个体模式组成

    1、初始模式

    所有的组合模式都以begin开始

    Pattern.begin()
    
    • 1
    2、近邻条件

    在初始模式之后,我们就可以按照复杂事件的顺序追加模式,组合成模式序列了

    Flink CEP 中提供了三种近邻关系:

    • 严格近邻(next)——两个事件紧挨着

    • 宽松近邻(followedBy)——先后顺序正确就行,无序紧挨着出现

    • 非确定性宽松近邻(followedByAny)——可以重复使用之前已经匹配过的事件

      image-20230409014105141

    3、其它条件

    (1)notNext()

    (2)notFollowedBy()

    (3)within()

    ​ 这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的

    4、循环模式中近邻关系

    循环模式——个体模式加了量词

    在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻

    对于定义了量词(如 oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻,那么可以通过以下方法可以更改近邻关系

    (1)consecutive()

    ​ 如果要为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的

    (2)allowCombinations()

    ​ 除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件。

    12.3.3 模式组

    多个模式的组合、嵌套,返回的类型为GroupPattern,为Pattern的子类型

    12.3.4 匹配后跳过策略

    如果我们想要精确控制事件的匹配应该跳过哪些情况,就需要制定另外的策略

    使用:

    // begin的第二个参数传入,默认跳过处理
    Pattern.begin("start", AfterMatchSkipStrategy.noSkip())
    
    • 1
    • 2

    不同的跳过策略:

    (1)不跳过(默认)

    AfterMatchSkipStrategy.noSkip()
    
    • 1

    (2)跳至下一个

    AfterMatchSkipStrategy.skipToNext()
    
    • 1

    (3)跳至所有子匹配

    AfterMatchSkipStrategy.skipPastLastEvent()
    
    • 1

    (4)跳至第一个

    AfterMatchSkipStrategy.skipToFirst(“a”)
    
    • 1

    (5)跳至最后一个

    AfterMatchSkipStrategy.skipToLast(“a”)
    
    • 1
    public class PatternTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            KeyedStream<LoginEvent, String> keyedStream = env.fromElements(
                    new LoginEvent("user1", "192.168.0.1", "a", 2000L),
                    new LoginEvent("user1", "192.168.0.2", "a", 3000L),
                    new LoginEvent("user1", "192.168.1.29", "a", 4000L),
                    new LoginEvent("user1", "171.56.23.10", "b", 5000L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forMonotonousTimestamps().withTimestampAssigner(
                    (SerializableTimestampAssigner<LoginEvent>) (loginEvent, l) -> loginEvent.getTimestamp()
            )).keyBy(LoginEvent::getUserId);
    
            // 1. 定义一个模式,连续三次登录失败
            Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("first", AfterMatchSkipStrategy.noSkip()).where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) {
                    return "a".equals(loginEvent.getEventType());
                }
            }).oneOrMore()
                    .followedBy("second").where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) {
                    return "b".equals(loginEvent.getEventType());
                }
            });
    
            // 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
            PatternStream<LoginEvent> patternStream = CEP.pattern(keyedStream, pattern);
    
            // 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
            patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
                @Override
                public String select(Map<String, List<LoginEvent>> map) throws Exception {
                    return String.format("--------%n first: %s%n second: %s%n--------%n", map.get("first"), map.get("second"));
                }
            }).print();
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    12.4 模式的检测处理

    将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息

    12.4.1 将模式应用到流上
    PatternStream<Event> patternStream = CEP.pattern(DataStream/KeyedStream, Pattern);
    
    • 1

    ​ 模式中定义的复杂事件发生是有先后顺序的,这取决于使用哪种时间语义。对于时间戳相同(事件时间)或是同时到达(处理时间)的事件,我们还可以通过比较器,来进行更精确的排序

    // 可选的事件比较器
    EventComparator<Event> comparator = ...
    PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
    
    • 1
    • 2
    • 3
    12.4.2 处理匹配事件
    1、匹配事件的选择提取

    (1)PatternSelectFunction

    DataStream<String> result = patternStream.select(new PatternSelectFunction());
    
    • 1

    (2)PatternFlatSelectFunction

    将匹配到的元素“扁平化”,通过收集器输出

    DataStream<String> result = patternStream.select(new PatternFlatSelectFunction());
    
    • 1
    2、匹配事件的通用处理
    patternStream.process(new PatternProcessFunction())
    
    • 1

    PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。

    PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time);还可以调用.output()方法将数据输出到侧输出流。

    12.4.3 处理超时事件

    比如我们用.within()指定了模式检测的时间间隔,超出这个时间当前这组检测就应该失败了.

    在 Flink CEP中,提供了一个专门捕捉超时的部分匹配事件的接口,叫作TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。所以这个接口必须与 PatternProcessFunction 结合使用,对处理结果的输出则需要利用侧输出流来进行

    1、使用 PatternProcessFunction 的侧输出流
    class MyPatternProcessFunction extends PatternProcessFunction<Event, String>
    implements TimedOutPartialMatchHandler<Event>
    
    • 1
    • 2
    2、使用 PatternTimeoutFunction

    上文提到的PatternProcessFunction 通过实现TimedOutPartialMatchHandler 接口扩展出了处理超时事件的能力,这是官方推荐的做法

    3、应用实例
    代码略
    
    • 1
    12.4.4 处理迟到的数据
    patternStream
        .sideOutputLateData(lateDataOutputTag)	// 将迟到数据输出到侧输出流
        .select(
            // 处理正常匹配数据
            new PatternSelectFunction<Event, ComplexEvent>() {...}
        );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    12.5 CEP的状态机实现

    image-20230409120754206

    举例:

    代码略
    
    • 1
  • 相关阅读:
    亚商投资顾问 早餐FM/12022023年开展第五次全国经济普查
    条条大路通罗马-面试题(一)
    vivo 网络端口安全建设技术实践
    「网页开发|前端开发|Vue」07 前后端分离:如何在Vue中请求外部数据
    vue实现聊天栏定位到最底部(超简单、可直接复制使用)
    FPGA语法相关知识合集
    CVE-2022-22954-VMware Workspace ONE Access SSTI远程代码执行流量特征
    Ajax跨域请求的两种实现方式
    git时出现的若干问题以及解决方案
    使用 Semantic Kernel 实现 Microsoft 365 Copilot 架构
  • 原文地址:https://blog.csdn.net/u012344939/article/details/132609191