林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
第8章 FlinkCEP
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
val start = Pattern.begin("start")
.where(_.getName.startsWith("start"))
middle.oneOrMore()
.subtype(classOf[SubEvent])
.where(
(value, ctx) => {
lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
value.getName.startsWith("foo") && sum + value.getPrice < 5.0
}
)
start.where(event => event.getName.startsWith("foo"))
middle.where(event => event.getName.startsWith("foo"))
.or(event => event.getPrice < 2.0)
middle.oneOrMore().until(event => event.getName() == “end”)
middle.where(event => event.getName.startsWith("foo"))
.consecutive()
val start = Pattern.begin("start")
.where(_.getName.startsWith("start"))
val strict = start.next("middle")
.where(event => event.getName.startsWith("foo"))
.times(3)
val strictNot = start.notNext("not")
.where(event => event.getName.startsWith("foo"))
val relaxed = start.followedBy("middle")
.where(event => event.getName.startsWith("foo"))
.times(3)
val relaxedNot = start.notFollowedBy("not")
.where(event => event.getName.startsWith("foo"))
.followedBy("middle").where(_.getPrice < 2.0)
val nonDetermin = start.followedByAny("middle")
.where(event => event.getName.startsWith("foo"))
val strict = start.next("middle")
.where(event => event.getName.startsWith("foo"))
.within(Time.seconds(10)
val pattern = Pattern.begin("start")
.where(_.getName.startsWith("start"))
.followedBy("middle")
.where(_.getName.startsWith("foo"))
.times(3).consecutive()
.followedBy("end").where(_.getPrice < 2.0)
.timesOrMore(2)
.within(Time.seconds(10))
val gPattern = Pattern.begin(Pattern.begin("start")
.where(_.getName.startsWith("start"))
.followedBy("start_middle")
.where(_.getName.startsWith("foo"))
)
.next(Pattern.begin("next_start")
.where(_.getName.startsWith("buy"))
.followedBy("next_middle")
.where(_.getPrice < 2.0)
).times(3).consecutive()
val skipStrategy = AfterMatchSkipStrategy.noSkip()
val skipStrategy = AfterMatchSkipStrategy.skipToNext()
val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()
val skipStrategy = AfterMatchSkipStrategy.skipToFirst(patternName)
val skipStrategy = AfterMatchSkipStrategy.skipToLast(patternName)
val skipStrategy = AfterMatchSkipStrategy.skipToNext()
val start = Pattern.begin("start", skipStrategy)
.where(_.getName.startsWith("start"))
val patternStream = CEP.pattern(input, pattern)
def selectFn(pattern: Map[String, Iterable[IN]]): OUT = {
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
OUT(startEvent, endEvent)
}
def flatSelectFn(pattern: Map[String, Iterable[IN]]): collector: COLLECTOR[OUT] = {
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
for (i <- 0 to startEvent.getValue){
collector.collect(OUT(startEvent, endEvent))
}
}
// 创建一个事件流
val patternStream = CEP.pattern(input, pattern)
// 定义一个OutputTag并命名为late-data
val outputTag = OutputTag[String](" late-data")
val result = patternStream.flatSelect(outputTag){
// 提取超时事件
(pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
out.collect(TimeoutEvent())
} { // 提取正常事件
(pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
out.collect(ComplexEvent())
}
// 调用getSideOutput并指定outputTag将超时事件输出
val timeoutResult = result.getSideOutput(outputTag)
Adam,click,1558430815185
Adam,buy,1558430815865
Adam,order,1558430815985
Berry,buy,1558430815988
Adam,click,1558430816068
Berry,order,1558430816074
Carl,click,1558430816151
Carl,buy,1558430816641
Dennis,buy,1558430817128
Carl,click,1558430817165
Ella,click,1558430818652
import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
// 定义输入事件的样例类
case class UserAction(userName: String, eventType: String, eventTime: Long)
// 定义输出事件的样例类
case class ClickAndBuyAction(userName: String, clickTime: Long, buyTime: Long)
object UserActionDetect {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataList = List(
UserAction("Adam", "click", 1558430815185L),
UserAction("Adam", "buy", 1558430815865L),
UserAction("Adam", "order", 1558430815985L),
UserAction("Berry", "buy", 1558430815988L),
UserAction("Adam", "click", 1558430816068L),
UserAction("Berry", "order", 1558430816074L),
UserAction("Carl", "click", 1558430816151L),
UserAction("Carl", "buy", 1558430816641L),
UserAction("Dennis", "buy", 1558430817128L),
UserAction("Carl", "click", 1558430817165L),
UserAction("Ella", "click", 1558430818652L),
)
// 1. 创建输入事件流
val userLogStream = env.fromCollection(dataList)
.assignAscendingTimestamps(_.eventTime)
.keyBy(_.userName)
// 2. 用户自定义模式
val userActionPattern = Pattern.begin[UserAction]("begin")
.where(_.eventType == "click")
.next("next")
.where(_.eventType == "buy")
// 3. 调用CEP.pattern方法寻找与模式匹配的事件
val patternStream = CEP.pattern(userLogStream, userActionPattern)
// 4. 输出结果
val result = patternStream.select(new ClickAndBuyMatch())
result.print()
env.execute()
}
}
// 重写select方法
class ClickAndBuyMatch() extends PatternSelectFunction[UserAction, ClickAndBuyAction] {
override def select(map: util.Map[String, util.List[UserAction]]): ClickAndBuyAction = {
val click: UserAction = map.get("begin").iterator().next()
val buy: UserAction = map.get("next").iterator().next()
ClickAndBuyAction(click.userName, click.eventTime, buy.eventTime)
}
}
cd ~
mkdir FlinkCEP
cd FlinkCEP
mkdir -p src/main/scala
vim src/main/scala/UserActionDetect.scala
<project>
<groupId>cn.edu.xmu.dblab</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
cd ~/FlinkCEP
/usr/local/maven/bin/mvn package
/usr/local/flink/bin/flink run -c \
> UserActionDetect ./target/simple-project-1.0.jar