第8章代码-林子雨编著-《Flink编程基础(Scala版)》

大数据学习路线图

林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第8章 FlinkCEP

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep-scala_2.12</artifactId>
    <version>1.11.2</version>
</dependency>
val start = Pattern.begin("start")
.where(_.getName.startsWith("start"))
middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )
start.where(event => event.getName.startsWith("foo"))
middle.where(event => event.getName.startsWith("foo"))
.or(event => event.getPrice < 2.0)
middle.oneOrMore().until(event => event.getName() == “end”)
middle.where(event => event.getName.startsWith("foo"))
.consecutive()
val start = Pattern.begin("start")
.where(_.getName.startsWith("start"))
val strict = start.next("middle")
.where(event => event.getName.startsWith("foo"))
.times(3)
val strictNot = start.notNext("not")
.where(event => event.getName.startsWith("foo"))
val relaxed = start.followedBy("middle")
.where(event => event.getName.startsWith("foo"))
.times(3)
val relaxedNot = start.notFollowedBy("not")
.where(event => event.getName.startsWith("foo"))
.followedBy("middle").where(_.getPrice < 2.0)
val nonDetermin = start.followedByAny("middle")
.where(event => event.getName.startsWith("foo"))
val strict = start.next("middle")
.where(event => event.getName.startsWith("foo"))
.within(Time.seconds(10)
val pattern = Pattern.begin("start")
.where(_.getName.startsWith("start"))
.followedBy("middle")
.where(_.getName.startsWith("foo"))
    .times(3).consecutive()
.followedBy("end").where(_.getPrice < 2.0)
.timesOrMore(2)
.within(Time.seconds(10))
val gPattern = Pattern.begin(Pattern.begin("start")
    .where(_.getName.startsWith("start"))
    .followedBy("start_middle")
    .where(_.getName.startsWith("foo"))
    )
    .next(Pattern.begin("next_start")
        .where(_.getName.startsWith("buy"))
        .followedBy("next_middle")
        .where(_.getPrice < 2.0)
    ).times(3).consecutive()
val skipStrategy = AfterMatchSkipStrategy.noSkip()
val skipStrategy = AfterMatchSkipStrategy.skipToNext()
val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()
val skipStrategy = AfterMatchSkipStrategy.skipToFirst(patternName)
val skipStrategy = AfterMatchSkipStrategy.skipToLast(patternName)
val skipStrategy = AfterMatchSkipStrategy.skipToNext()
val start = Pattern.begin("start", skipStrategy)
  .where(_.getName.startsWith("start"))
val patternStream = CEP.pattern(input, pattern)
def selectFn(pattern: Map[String, Iterable[IN]]): OUT = {
  val startEvent = pattern.get("start").get.next
  val endEvent = pattern.get("end").get.next
  OUT(startEvent, endEvent)
}
def flatSelectFn(pattern: Map[String, Iterable[IN]]): collector: COLLECTOR[OUT] = {
  val startEvent = pattern.get("start").get.next
  val endEvent = pattern.get("end").get.next
  for (i <- 0 to startEvent.getValue){
    collector.collect(OUT(startEvent, endEvent))
  }
}
// 创建一个事件流
val patternStream = CEP.pattern(input, pattern)
// 定义一个OutputTag并命名为late-data
val outputTag = OutputTag[String](" late-data")

val result = patternStream.flatSelect(outputTag){
    // 提取超时事件
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} { // 提取正常事件
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}
// 调用getSideOutput并指定outputTag将超时事件输出
val timeoutResult = result.getSideOutput(outputTag)
Adam,click,1558430815185
Adam,buy,1558430815865
Adam,order,1558430815985
Berry,buy,1558430815988
Adam,click,1558430816068
Berry,order,1558430816074
Carl,click,1558430816151
Carl,buy,1558430816641
Dennis,buy,1558430817128
Carl,click,1558430817165
Ella,click,1558430818652
import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
// 定义输入事件的样例类
case class UserAction(userName: String, eventType: String, eventTime: Long)
// 定义输出事件的样例类
case class ClickAndBuyAction(userName: String, clickTime: Long, buyTime: Long)

object UserActionDetect {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataList = List(
      UserAction("Adam", "click", 1558430815185L),
      UserAction("Adam", "buy", 1558430815865L),
      UserAction("Adam", "order", 1558430815985L),
      UserAction("Berry", "buy", 1558430815988L),
      UserAction("Adam", "click", 1558430816068L),
      UserAction("Berry", "order", 1558430816074L),
      UserAction("Carl", "click", 1558430816151L),
      UserAction("Carl", "buy", 1558430816641L),
      UserAction("Dennis", "buy", 1558430817128L),
      UserAction("Carl", "click", 1558430817165L),
      UserAction("Ella", "click", 1558430818652L),
    )
    // 1. 创建输入事件流
    val userLogStream = env.fromCollection(dataList)
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.userName)
    // 2. 用户自定义模式
    val userActionPattern = Pattern.begin[UserAction]("begin")
      .where(_.eventType == "click")
      .next("next")
      .where(_.eventType == "buy")
    // 3. 调用CEP.pattern方法寻找与模式匹配的事件
    val patternStream = CEP.pattern(userLogStream, userActionPattern)
    // 4. 输出结果
    val result = patternStream.select(new ClickAndBuyMatch())

    result.print()

    env.execute()
  }
}
// 重写select方法
class ClickAndBuyMatch() extends PatternSelectFunction[UserAction, ClickAndBuyAction] {
  override def select(map: util.Map[String, util.List[UserAction]]): ClickAndBuyAction = {
    val click: UserAction = map.get("begin").iterator().next()
    val buy: UserAction = map.get("next").iterator().next()
    ClickAndBuyAction(click.userName, click.eventTime, buy.eventTime)
  }
}
cd ~
mkdir FlinkCEP
cd FlinkCEP
mkdir -p src/main/scala
vim src/main/scala/UserActionDetect.scala
<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
cd ~/FlinkCEP
/usr/local/maven/bin/mvn package
/usr/local/flink/bin/flink run -c \
> UserActionDetect ./target/simple-project-1.0.jar