大数据全系列 教程
1869个小节阅读:467.7k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
FlinkCEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:
输入事件流的创建
xxxxxxxxxx
val input: DataStream[EventLog] = ...
Pattern的定义
Pattern(模式)定义,每个Pattern都是通过begin方法定义的:
xxxxxxxxxx
val pattern = Pattern.begin[EventLog]("first")
通过Pattern.where()方法在Pattern上指定Condition(条件),只有当条件满足之后,当前的Pattern才会接受事件。
xxxxxxxxxx
pattern.where(_.eventType.equals("fail"))
where()
方法来实现的,具体的过滤逻辑则通过传入的SimpleCondition内的filter()方法来定义。
xxxxxxxxxx
.where(new SimpleCondition[EventLog](){
override def filter(value: EventLog): Boolean = {
value.eventType.equals("fail")
}
})
每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。
xxxxxxxxxx
pattern.where(_.eventType.equals("fail"))
.where(_.ip.startsWith("192.168.20."))
next("xxx")表示必须同时符合条件。
xxxxxxxxxx
pattern.next("second") //第二个登录失败事件
.where(_.eventType.equals("fail"))
Pattern应用在事件流上检测
xxxxxxxxxx
val patternStream = CEP.pattern(input, pattern)
选取结果
xxxxxxxxxx
val result: DataStream[Alert] = patternStream.process(
new PatternProcessFunction[Event, Alert]() {
override def processMatch(
`match`: util.Map[String, util.List[Event]],
ctx: PatternProcessFunction.Context,
out: Collector[Alert]): Unit = {
out.collect(createAlertFrom(pattern))
}
})
需求:实现登录告警系统,从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次,则是恶意登录),从而找到哪些用户名是恶意登录。
代码实现:
xxxxxxxxxx
package com.itbaizhan.cep
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
case class EventLog(userId: String, ip: String, eventType: String,timestamp: Long)
object CepDemo1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1.获取登录事件流,时间语义指定日志登录时间
val stream = env
.fromElements(//1893427200000-1893513599000 2030-01-01 00:00:00-2030-01-01 23:59:59
EventLog("user1", "192.168.20.101", "fail", 1893427201000L),
EventLog("user1", "192.168.20.102", "fail", 1893427202000L),
EventLog("user2", "192.168.21.123", "fail", 1893427203000L),
EventLog("user1", "171.56.23.110", "fail", 1893427205000L),
EventLog("user2", "192.168.21.115", "success", 1893427207000L),
EventLog("user2", "192.168.21.117", "fail", 1893427208000L),
EventLog("user2", "192.168.21.118", "fail", 1893427209000L),
EventLog("user3", "192.168.20.101", "fail", 1893427281000L),
EventLog("user3", "192.168.20.102", "fail", 1893427282000L),
EventLog("user3", "192.168.21.123", "fail", 1893513699000L))
.assignAscendingTimestamps(_.timestamp)
.keyBy(_.userId)
//2.定义 Pattern,获取同一个用户连续三次登录失败
val pattern = Pattern
.begin[EventLog]("first") //从第一个登录失败事件开始
.where(_.eventType.equals("fail"))
.next("second") //第二个登录失败事件
.where(_.eventType.equals("fail"))
.next("third") //第三个登录失败事件
.where(_.eventType.equals("fail"))
//3. 将Pattern应用到流上,检测匹配的复杂事件,得到一个 PatternStream
val patternStream = CEP.pattern(stream, pattern)
//4. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
patternStream
.select(new PatternSelectFunction[EventLog, String] {
override def select(map: util.Map[String, util.List[EventLog]]): String
= {
val first = map.get("first").get(0)
val second = map.get("second").get(0)
val third = map.get("third").get(0)
first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " +
second.timestamp + ", " + third.timestamp
}
})
.print("账户锁定")
env.execute()
}
}