一、基本概念
1.1 CEP是什么
哈喽各位!这个是Flink1.13最后一篇了,接下来会给各位小伙伴们分享一些关于数据治理以及数仓方面的内容了!敬请期待!!!!好了,进入正题了哈!!!!
所谓CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而Flink CEP,就是Flink实现的一个用于复杂事件处理的库(library)。复杂事件处理具体的过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:
(1)定义一个匹配规则
(2)将匹配规则应用到事件流上,检测满足规则的复杂事件
(3)对检测到的复杂事件进行处理,得到结果进行输出
所以,CEP是针对流处理而言的,分析的是低延迟、频繁产生的事件流。它的主要目的,就是在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征。
1.2模式(Pattern)
CEP的第一步所定义的匹配规则,我们可以把它叫作“模式”(Pattern)。模式的定义主要就是两部分内容:
每个简单事件的特征
简单事件之间的组合关系
事件的组合关系,可以定义严格的近邻关系,也就是两个事件之前不能有任何其他事件;也可以定义宽松的近邻关系,即只要前后顺序正确即可,中间可以有其他事件。另外,还可以反向定义,也就是“谁后面不能跟着谁”。
CEP做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件;模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时(timeout)。
Flink CEP为我们提供了丰富的API,可以实现上面关于模式的所有功能,这套API就叫作“模式API”(Pattern API)。
1.3 应用场景
CEP的应用场景非常丰富。很多大数据框架,如Spark、Samza、Beam等都提供了不同的CEP解决方案,但没有专门的库(library)。而Flink提供了专门的CEP库用于复杂事件处理,可以说是目前CEP的最佳解决方案。
风险控制
设定一些行为模式,可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败,就可以向用户发送通知信息,或是进行报警提示。
用户画像
利用CEP可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有特定行为习惯的一些用户,做出相应的用户画像。
运维监控
对于企业服务的运维管理,可以利用CEP灵活配置多指标、多依赖来实现更复杂的监控模式。
二、快速上手
2.1 需要引入的依赖
想要在代码中使用Flink CEP,需要在项目的pom文件中添加相关依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
如果想要在Flink集群中提交运行CEP作业,应该将依赖的jar包放在/lib目录下。
2.2 一个简单实例
接下来我们考虑一个具体的需求:检测用户行为,如果连续三次登录失败,就输出报警信息。很显然,这是一个复杂事件的检测处理,我们可以使用Flink CEP来实现。我们首先定义数据的类型。这里的用户行为不再是之前的访问事件Event了,所以应该单独定义一个登录事件POJO类。具体实现如下:
public class LoginEvent {
public String userId;
public String ipAddress;
public String eventType;
public Long timestamp;
public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
public LoginEvent() {}
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ipAddress='" + ipAddress + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
具体代码实现如下:
public class LoginFailDetect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取登录事件流,并提取时间戳、生成水位线
KeyedStream<LoginEvent, String> stream = env
.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<LoginEvent>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent loginEvent, long l) {
return loginEvent.timestamp;
}
}
)
)
.keyBy(r -> r.userId);
// 1. 定义Pattern,连续的三个登录失败事件
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("first") // 以第一个登录失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("second") // 接着是第二个登录失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("third") // 接着是第三个登录失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
});
// 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
patternStream
.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent first = map.get("first").get(0);
LoginEvent second = map.get("second").get(0);
LoginEvent third = map.get("third").get(0);
return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
}
})
.print("warning");
env.execute();
}
}
三、模式API(Pattern API)
Flink CEP的核心是复杂事件的模式匹配。Flink CEP库中提供了Pattern类,基于它可以调用一系列方法来定义匹配模式,这就是所谓的模式API(Pattern API)。
3.1 个体模式
模式(Pattern)其实就是将一组简单事件组合成复杂事件的“匹配规则”。由于流中事件的匹配是有先后顺序的,因此一个匹配规则就可以表达成先后发生的一个个简单事件,按顺序串联组合在一起。这里的每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个简单事件的匹配规则,叫作“个体模式”(Individual Pattern)。
1. 基本形式
每一个登录失败事件的选取规则,就都是一个个体模式。比如:
.<LoginEvent>begin("first") // 以第一个登录失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
或者:
.next("second") // 接着是第二个登录失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
这些都是个体模式。个体模式一般都会匹配接收一个事件。每个个体模式都以一个“连接词”开始定义的,比如begin、next等等,这是Pattern对象的一个方法(begin是Pattern类的静态方法),返回的还是一个Pattern。这些“连接词”方法有一个String类型参数,这就是当前个体模式唯一的名字,比如这里的“first”、“second”。在之后检测到匹配事件时,就会以这个名字来指代匹配事件。
2. 量词(Quantifiers)
个体模式后面可以跟一个“量词”,用来指定循环的次数。从这个角度分类,个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”。默认情况下,个体模式是单例模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。在Flink CEP中,可以使用不同的方法指定循环模式,主要有:
.oneOrMore()
匹配事件出现一次或多次,假设a是一个个体模式,a.oneOrMore()表示可以匹配1个或多个a的事件组合。我们有时会用a+来简单表示。
.times(times)
匹配事件发生特定次数(times),例如a.times(3)表示aaa;
.times(fromTimes,toTimes)
指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 4)可以匹配aa,aaa和aaaa。
.greedy()
只能用在循环模式后,使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去匹配。例如a.times(2, 4).greedy(),如果出现了连续4个a,那么会直接把aaaa检测出来进行处理,其他任意2个a是不算匹配事件的。
.optional()
使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
对于一个个体模式pattern来说,后面所有可以添加的量词如下:
// 匹配事件出现4次
pattern.times(4);
// 匹配事件出现4次,或者不出现
pattern.times(4).optional();
// 匹配事件出现2, 3 或者4次
pattern.times(2, 4);
// 匹配事件出现2, 3 或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();
// 匹配事件出现2, 3, 4次,或者不出现
pattern.times(2, 4).optional();
// 匹配事件出现2, 3, 4次,或者不出现;并且尽可能多地匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现1次或多次
pattern.oneOrMore();
// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy();
// 匹配事件出现1次或多次,或者不出现
pattern.oneOrMore().optional();
// 匹配事件出现1次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现2次或多次
pattern.timesOrMore(2);
// 匹配事件出现2次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy();
// 匹配事件出现2次或多次,或者不出现
pattern.timesOrMore(2).optional()
// 匹配事件出现2次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();
3. 条件(Conditions)
对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则。Flink CEP会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件。对于条件的定义,主要是通过调用Pattern对象的.where()方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用Pattern对象的.subtype()方法来限定匹配事件的子类型。
限定子类型
调用.subtype()方法可以为当前模式增加子类型限制条件。例如:
pattern.subtype(SubEvent.class);
这里SubEvent是流中数据类型Event的子类型。这时,只有当事件是SubEvent类型时,才可以满足当前模式pattern的匹配条件。
简单条件(Simple Conditions)
简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个filter操作。代码中我们为.where()方法传入一个SimpleCondition的实例作为参数。SimpleCondition是表示“简单条件”的抽象类,内部有一个.filter()方法,唯一的参数就是当前事件。所以它可以当作FilterFunction来使用。下面是一个具体示例:
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.user.startsWith("A");
}
});
迭代条件(Iterative Conditions)
简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”(Iterative Condition)。
在Flink CEP中,提供了IterativeCondition抽象类。这其实是更加通用的条件表达,查看源码可以发现,.where()方法本身要求的参数类型就是IterativeCondition;而之前的SimpleCondition是它的一个子类。在IterativeCondition中同样需要实现一个filter()方法,不过与SimpleCondition中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。下面是一个具体示例:
middle.oneOrMore()
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
// 事件中的user必须以A开头
if (!value.user.startsWith("A")) {
return false;
}
int sum = value.amount;
// 获取当前模式之前已经匹配的事件,求所有事件amount之和
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.amount;
}
// 在总数量小于100时,当前事件满足匹配规则,可以匹配成功
return sum < 100;
}
});
组合条件(Combining Conditions)
独立定义多个条件,然后在外部把它们连接起来,就可以构成一个“组合条件”(Combining Condition)。最简单的组合条件,就是.where()后面再接一个.where()。因为前面提到过,一个条件就像是一个filter操作,所以每次调用.where()方法都相当于做了一次过滤,连续多次调用就表示多重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与”(AND)。而多个条件的逻辑或(OR),则可以通过.where()后加一个.or()来实现。
终止条件(Stop Conditions)
对于循环模式而言,还可以指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的.until()方法来实现的,同样传入一个IterativeCondition作为参数。需要注意的是,终止条件只与oneOrMore()或者oneOrMore().optional()结合使用。
3.2 组合模式
有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),为了跟个体模式区分有时也叫作“模式序列”(Pattern Sequence)。一个组合模式有以下形式:
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
.next("next").where(...)
.followedBy("follow").where(...)
...
可以看到,组合模式确实就是一个“模式序列”,是用诸如begin、next、followedBy等表示先后顺序的“连接词”将个体模式串连起来得到的。
1. 初始模式(Initial Pattern)
所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用Pattern的静态方法.begin()来创建。如下所示:Pattern<Event, ?> start = Pattern.begin("start"); 这里我们调用Pattern的.begin()方法创建了一个初始模式。传入的String类型的参数就是模式的名称;而begin方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型,这里我们定义为Event。调用的结果返回一个Pattern的对象实例。
2. 近邻条件(Contiguity Conditions)
模式之间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系,这就是所谓的“近邻条件”(Contiguity Conditions,也叫“连续性条件”)。Flink CEP中提供了三种近邻关系:
严格近邻(Strict Contiguity)
匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。代码中对应的就是Pattern的.next()方法,名称上就能看出来,“下一个”自然就是紧挨着的。
宽松近邻(Relaxed Contiguity)
宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求,也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应.followedBy()方法,很明显这表示“跟在后面”就可以,不需要紧紧相邻。
非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)
这种近邻关系更加宽松。所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多。代码中对应.followedByAny()方法。
3. 其他限制条件
除了上面提到的next()、followedBy()、followedByAny()可以分别表示三种近邻条件,我们还可以用否定的“连接词”来组合个体模式。主要包括:
.notNext()
表示前一个模式匹配到的事件后面,不能紧跟着某种事件。
.notFollowedBy()
表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于notFollowedBy()是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。
另外,Flink CEP中还可以为模式指定一个时间限制,这是通过调用.within()方法实现的。方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。下面是模式序列中所有限制条件在代码中的定义:
// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...);
// 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// 时间限制条件
middle.within(Time.seconds(10));
4. 循环模式中的近邻条件
在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于定义了量词(如oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻。也就是说,当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、再用followedBy()连接起来。
.consecutive()
为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。也就是说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的next()一样,需要与循环量词times()、oneOrMore()配合使用。于是,检测连续三次登录失败的代码可以改成:
// 1. 定义Pattern,登录失败事件,循环检测3次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("fails")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
}).times(3).consecutive();
这样显得更加简洁;而且即使要扩展到连续100次登录失败,也只需要改动一个参数而已。
.allowCombinations()
除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件。这需要调用.allowCombinations()方法来实现,实现的效果与.followedByAny()相同。
3.3 模式组
一般来说,代码中定义的模式序列,就是我们在业务逻辑中匹配复杂事件的规则。不过在有些非常复杂的场景中,可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则。为了应对这样的需求,Flink CEP允许我们以“嵌套”的方式来定义模式。之前在模式序列中,我们用begin()、next()、followedBy()、followedByAny()这样的“连接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组”(Groups of Patterns)。
四、模式的检测处理
利用Pattern API定义好模式还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息。
4.1 将模式应用到流上
将模式应用到事件流上的代码非常简单,只要调用CEP类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个PatternStream:
DataStream<Event> inputStream = ...
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
这里的DataStream,也可以通过keyBy进行按键分区得到KeyedStream,接下来对复杂事件的检测就会针对不同的key单独进行了。模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时间语义,那么所谓先后就是数据到达的顺序。
4.2 处理匹配事件
基于PatternStream可以调用一些转换方法,对匹配的复杂事件进行检测和处理,并最终得到一个正常的DataStream。PatternStream的转换操作主要可以分成两种:简单的选择提取(select)操作,和更加通用的处理(process)操作。与DataStream的转换类似,具体实现也是在调用API时传入一个函数类:选择操作传入的是一个PatternSelectFunction,处理操作传入的则是一个PatternProcessFunction。
1. 匹配事件的选择提取(select)
处理匹配事件最简单的方式,就是从PatternStream中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。
PatternSelectFunction
代码中基于PatternStream直接调用.select()方法,传入一个PatternSelectFunction作为参数。
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());
这里的MyPatternSelectFunction是PatternSelectFunction的一个具体实现。PatternSelectFunction是Flink CEP提供的一个函数类接口,它会将检测到的匹配事件保存在一个Map里,对应的key就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在Map里的value就是一个事件的列表(List)。下面是MyPatternSelectFunction的一个具体实现:
class MyPatternSelectFunction implements PatternSelectFunction<Event, String>{
@Override
public String select(Map<String, List<Event>> pattern) throws Exception {
Event startEvent = pattern.get("start").get(0);
Event middleEvent = pattern.get("middle").get(0);
return startEvent.toString() + " " + middleEvent.toString();
}
}
可以通过名称从Map中选择提取出对应的事件。注意调用Map的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List中只有一个元素,直接调用.get(0)就可以把它取出。当然,如果个体模式是循环的,List中就有可能有多个元素了。例如我们对连续登录失败检测的改进,可以将匹配到的事件包装成String类型的报警信息输出,代码如下:
// 1. 定义Pattern,登录失败事件,循环检测3次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("fails")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
}).times(3).consecutive();
// 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream
.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
// 只有一个模式,匹配到了3个事件,放在List中
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
}
})
.print("warning");
PatternFlatSelectFunction
除此之外,PatternStream还有一个类似的方法是.flatSelect(),传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction的“扁平化”版本;内部需要实现一个flatSelect()方法,它与之前select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用out.collet()方法就可以实现多次发送输出数据了。
2. 匹配事件的通用处理(process)
自1.8版本之后,Flink CEP引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream的.process()方法,传入一个PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。所以PatternProcessFunction功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,PatternSelectFunction和PatternFlatSelectFunction在CEP内部执行时也会被转换成PatternProcessFunction。我们可以使用PatternProcessFunction将之前的代码重写如下:
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context ctx,
Collector<String> out) throws Exception {
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp);
}
}).print("warning");
可以看到,PatternProcessFunction中必须实现一个processMatch()方法;这个方法与之前的flatSelect()类似,只是多了一个上下文Context参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time);还可以调用.output()方法将数据输出到侧输出流。
4.3 处理超时事件
复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:
(1)如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的Map中;
(2)如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction的processMatch()方法进行处理;
(3)如果当前事件不符合模式匹配的条件,就丢弃该事件;
(4)如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。
不过在有时间限制的情况下,有时我们会希望捕获并处理超时事件。CEP提供了处理超时事件的方法。
1. 使用PatternProcessFunction的侧输出流
在Flink CEP中,提供了一个专门捕捉超时的部分匹配事件的接口,叫作TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个Map中,作为方法的第一个参数;方法的第二个参数则是PatternProcessFunction的上下文Context。所以这个接口必须与PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。代码中的调用方式如下:
class MyPatternProcessFunction extends PatternProcessFunction<Event, String>
implements TimedOutPartialMatchHandler<Event> {
// 正常匹配事件的处理
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception{
...
}
// 超时部分匹配事件的处理
@Override
public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception{
Event startEvent = match.get("start").get(0);
OutputTag<Event> outputTag = new OutputTag<Event>("time-out"){};
ctx.output(outputTag, startEvent);
}
}
我们在processTimedOutMatch()方法中定义了一个输出标签(OutputTag)。调用ctx.output()方法,就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。
2. 使用PatternTimeoutFunction
上文提到的PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。此外,Flink CEP中也保留了早期简化版的PatternSelectFunction,它无法直接处理超时事件,不过我们可以通过调用PatternStream的.select()方法时多传入一个PatternTimeoutFunction参数来实现这一点。由于调用.select()方法后会得到唯一的DataStream,所以正常匹配事件和超时事件的处理结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的DataStream,而超时事件的处理结果则会进入侧输出流;这个侧输出流需要另外传入一个侧输出标签(OutputTag)来指定。所以最终我们在调用PatternStream的.select()方法时需要传入三个参数:侧输出流标签(OutputTag),超时事件处理函数PatternTimeoutFunction,匹配事件提取函数PatternSelectFunction。下面是一个代码中的调用方式:
// 定义一个侧输出流标签,用于标识超时侧输出流
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};
// 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
SingleOutputStreamOperator<String> resultStream = patternStream
.select(timeoutTag,
// 超时部分匹配事件的处理
new PatternTimeoutFunction<Event, String>() {
@Override
public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
Event event = pattern.get("start").get(0);
return "超时:" + event.toString();
}
},
// 正常匹配事件的处理
new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> pattern) throws Exception {
...
}
}
);
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");
3. 应用实例
接下来我们看一个具体的应用场景。在电商平台中,最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。首先定义出要处理的数据类型。我们面对的是订单事件,主要包括用户对订单的创建(下单)和支付两种行为。因此可以定义POJO类OrderEvent如下,其中属性字段包括用户ID、订单ID、事件类型(操作类型)以及时间戳。
public class OrderEvent {
public String userId;
public String orderId;
public String eventType;
public Long timestamp;
public OrderEvent() {
}
public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {
this.userId = userId;
this.orderId = orderId;
this.eventType = eventType;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "OrderEvent{" +
"userId='" + userId + '\'' +
"orderId='" + orderId + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
当前需求的重点在于对超时未支付的用户进行监控提醒,也就是需要检测有下单行为、但15分钟内没有支付行为的复杂事件。在下单和支付之间,可以有其他操作(比如对订单的修改),所以两者之间是宽松近邻关系。我们重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单ID进行分组,然后检测每个订单的“下单-支付”复杂事件,如果出现超时事件需要输出报警提示信息。整体代码实现如下:
public class OrderTimeoutDetect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取订单事件流,并提取时间戳、生成水位线
KeyedStream<OrderEvent, String> stream = env
.fromElements(
new OrderEvent("user_1", "order_1", "create", 1000L),
new OrderEvent("user_2", "order_2", "create", 2000L),
new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderEvent>() {
@Override
public long extractTimestamp(OrderEvent event, long l) {
return event.timestamp;
}
}
)
)
.keyBy(order -> order.orderId); // 按照订单ID分组
// 1. 定义Pattern
Pattern<OrderEvent, ?> pattern = Pattern
.<OrderEvent>begin("create") // 首先是下单事件
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return value.eventType.equals("create");
}
})
.followedBy("pay") // 之后是支付事件;中间可以修改订单,宽松近邻
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return value.eventType.equals("pay");
}
})
.within(Time.minutes(15)); // 限制在15分钟之内
// 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<OrderEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
SingleOutputStreamOperator<String> payedOrderStream = patternStream.process(new OrderPayPatternProcessFunction());
// 将正常匹配和超时部分匹配的处理结果流打印输出
payedOrderStream.print("payed");
payedOrderStream.getSideOutput(timeoutTag).print("timeout");
env.execute();
}
// 实现自定义的PatternProcessFunction,需实现TimedOutPartialMatchHandler接口
public static class OrderPayPatternProcessFunction extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {
// 处理正常匹配事件
@Override
public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {
OrderEvent payEvent = match.get("pay").get(0);
out.collect("订单 " + payEvent.orderId + " 已支付!");
}
// 处理超时未支付事件
@Override
public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {
OrderEvent createEvent = match.get("create").get(0);
ctx.output(new OutputTag<String>("timeout"){}, "订单 " + createEvent.orderId + " 超时未支付!用户为:" + createEvent.userId);
}
}
}
4.4 处理迟到数据
CEP主要处理的是先后发生的一组复杂事件,所以事件的顺序非常关键。在事件时间语义下,需要按照事件自身的时间戳来排序。这就有可能出现时间戳大的事件先到、时间戳小的事件后到的现象,也就是所谓的“乱序数据”或“迟到数据”。在Flink CEP中沿用了通过设置水位线(watermark)延迟来处理乱序数据的做法。不过水位线的延迟不可能完美处理所有迟到数据;如果不希望迟到数据丢掉,可以借鉴窗口的做法。Flink CEP同样提供了将迟到事件输出到侧输出流的方式:我们可以基于PatternStream直接调用.sideOutputLateData()方法,传入一个OutputTag,将迟到数据放入侧输出流另行处理。代码中调用方式如下:
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// 定义一个侧输出流的标签
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream
.sideOutputLateData(lateDataOutputTag) // 将迟到数据输出到侧输出流
.select(
// 处理正常匹配数据
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
// 从结果中提取侧输出流
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
可以看到,整个处理流程与窗口非常相似。经处理匹配数据得到结果数据流之后,可以调用.getSideOutput()方法来提取侧输出流,捕获迟到数据进行额外处理。