大数据全系列 教程
1869个小节阅读:467.6k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
first
和second
的两个个体模式,于是可以通过这个名称从 Map 中选择提取出对应的事件。注意调用 Map 的 get(key)方法后得到的是一个事件的 List;如果个体模式是单例的,那么 List 中只有一个元素,直接调用get(0)就可以把它取出。当然,如果个体模式是循环的,List 中就有可能有多个元素了,例:xxxxxxxxxx
select(new PatternSelectFunction[EventLog, String] {
override def select(map: util.Map[String, util.List[EventLog]]): String
= {
val first = map.get("first").get(0)
val second = map.get("first").get(1)
val third = map.get("first").get(2)
first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " +
second.timestamp + ", " + third.timestamp
}
})
PatternStream
还有一个类似的方法是 flatSelect(),传入的参数是一个
PatternFlatSelectFunction
。从名字上就能看出,这是PatternSelectFunction
的“扁平化”版本;
内部需要实现一个flatSelect()
方法,它与之前 select()
的不同就在于没有返回值,而是多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。
xxxxxxxxxx
patternStream.flatSelect(new PatternFlatSelectFunction[EventLog,String] {
override def flatSelect(map: util.Map[String, util.List[EventLog]],
collector: Collector[String]): Unit = {
val first = map.get("first").get(0)
val second = map.get("first").get(1)
val third = map.get("first").get(2)
collector.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " +
second.timestamp + ", " + third.timestamp)
}
}).print("账户锁定")
process()
方法,传入一个PatternProcessFunction
。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。所以PatternProcessFunction
功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,PatternSelectFunction
和PatternFlatSelectFunction
在 CEP 内部执行时也会被转换成PatternProcessFunction
。xxxxxxxxxx
patternStream.process(new PatternProcessFunction[EventLog,String] {
override def processMatch(
map: util.Map[String, util.List[EventLog]],
context: PatternProcessFunction.Context,
collector: Collector[String]): Unit = {
val first = map.get("first").get(0)
val second = map.get("first").get(1)
val third = map.get("first").get(2)
collector.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " +
second.timestamp + ", " + third.timestamp)
}
}).print("账户锁定")