概念

cep-concept

如上图所示,假设我们对模式 A、B、B、C 感兴趣,它代表我们想要找到这样的事件序列:

  • A 类事件发生
  • B 类事件发生两次
  • C 类事件发生

从事件流中匹配出 a1、b1、b2、c1 的具体事件。注意,这里我们并不要求事件之间是严格连续的

flink-cep 分为 3 个核心概念:

  • 模式(Pattern
  • 事件流(DataStream<Event>
  • 匹配结果(Match

flink-cep 的应用场景有:

  • 实时风控。
    • 风险用户检测。5 分钟内转账次数超过 10 次且金额大于 10000
  • 实时营销。滴滴的一些实时营销场景:
    • 乘客线上冒泡 1 分钟没罚单
    • 乘客下单后 2 分钟内没司机接单
    • 乘客在不同业务线之间比价
  • 实时规则检测。
    • 直播实时检测。检测到 10 分钟内观看人数持续下跌,实时推送直播达人,调整直播策略
    • 用户在 30 分钟内创建多笔订单,没有支付,疑似刷单,进行账号封禁
    • 爆品发现。某款商品 5 分钟内成交超过 1000 单,实时推送商家,提醒及时补货、直播间挂链接
    • 在线奖励。当完成在线任务后,及时发放积分和奖励

1.模式

模式有 3 种:

  • 单个模式
  • 组合模式
  • 模式组

1.1 单个模式

1.1.1 条件(Conditions)

可以指定一个条件来决定事件是否进入这个模式:

  • 条件可以很简单,比如 value > 5name contains "SB"
  • 条件可以是很复杂的,由多个条件组合、具有层级的筛选规则组成,形成组合条件树

组合条件树需包含 2 个概念:

  • 可组合。可通过且或条件进行组合
  • 可分层。条件不只是简单的 value > 5 还可以是复合条件如 18 < age < 60 and sex == "man"

比如想捞取身体健康、无犯罪史、女性年龄在 18 ~ 60 或男性在 18 ~ 65 岁之间的用户:

可用 JSON 表达如下:

{
    "type": "and",
    "expressions": [
        {
            "type": "single",
            "detail": {
                "fieldName": "health",
                "fieldType": "enum",
                "operation": "==",
                "values": [
                    "good"
                ]
            }
        },
        {
            "type": "single",
            "detail": {
                "fieldName": "criminal",
                "fieldType": "enum",
                "operation": "==",
                "values": [
                    "none"
                ]
            }
        },
        {
            "type": "composite",
            "detail": {
                "type": "or",
                "expressions": [
                    {
                        "type": "composite",
                        "detail": {
                            "type": "and",
                            "expressions": [
                                {
                                    "type": "single",
                                    "detail": {
                                        "fieldName": "age",
                                        "fieldType": "number",
                                        "operation": "range",
                                        "values": [
                                            18,
                                            60
                                        ]
                                    }
                                },
                                {
                                    "type": "single",
                                    "detail": {
                                        "fieldName": "sex",
                                        "fieldType": "enum",
                                        "operation": "==",
                                        "values": [
                                            "woman"
                                        ]
                                    }
                                }
                            ]
                        }
                    },
                    {
                        "type": "composite",
                        "detail": {
                            "type": "and",
                            "expressions": [
                                {
                                    "type": "single",
                                    "detail": {
                                        "fieldName": "age",
                                        "fieldType": "number",
                                        "operation": "range",
                                        "values": [
                                            18,
                                            65
                                        ]
                                    }
                                },
                                {
                                    "type": "single",
                                    "detail": {
                                        "fieldName": "sex",
                                        "fieldType": "enum",
                                        "operation": "==",
                                        "values": [
                                            "man"
                                        ]
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        }
    ]
}

1.1.2 量词

如果只能检测到单个事件(虽然单个事件的匹配规则可能很复杂,但也只能匹配一个事件),会使用受限。flink-cep 也可以支持对模式增加量词,表达事件的重复次数。

模式类型:

  • 单例模式。只能匹配一次
  • 循环模式。可重复匹配

cep

比如在金融风控场景,检测用户连续转账 5 次且每次转账金额大于 10000 的行为,暂时封禁用户账号,防止被诈骗。转账事件定义如下,金额单位:分

{
    "timestamp": 1764315873504,
    "userId": "1",
    "amount": 1000000,
    "action": "transfer"
}

想检测到转账金额大于 10000,条件为 action == "transfer" and amount >= 1000000,接下来需要检测事件重复次数超过 5 次。5 次即为量词

  • 单例模式。action == "transfer" and amount >= 1000000
  • 循环模式。设置 action == "transfer" and amount >= 1000000 重复次数超过 5 次

在 flink-cep 中实现如下:

Pattern<Event, Event> pattern = Pattern.<Event>begin("转账事件")
        .where(new AviatorCondition<>("action == transfer and amount >= 1000000"))
        .timesOrMore(5); // 5 次或更多

flink-cep 支持的量词如下:

  • times(5)。期望出现 5 次
  • times(1, 3)。期望出现 1 ~ 3 次
  • timesOrMore(5)。期望出现 5 次或更多
  • oneOrMore()。等价于 timesOrMore(1)
  • opitonal()。可将量词变成可选,加上optional() 表示可以不发生
    • times(5).optional()。表示可以出现 5 次,也可出现 0 次
    • times(1, 3).optional()。表示可以出现 1 ~ 3 次,也可以出现 0 次
    • timesOrMore(5).optional()。表示可以出现 5 次或更多,也可出现 0 次
  • greedy()。期望尽可能多的重复,比如一个期望一个事件重复 3 次,实际上出现 10 次,匹配的时候期望把 10 次全拿出来作为一个结果,而不是把 10 次作为 3 + 3 + 3,给了 3 个结果。
    • times(5).greedy()。表示可以出现 5 次,并尽可能的重复次数多
    • times(1, 3).greedy()。表示可以出现 1 ~ 3 次,并尽可能的重复次数多
    • timesOrMore(5).greedy()。表示可以出现 5 次或更多,并尽可能的重复次数多
  • optinonalgreedy() 也可以一起使用
    • times(5).optional().greedy()。表示可以出现 5 次,也可以出现 0 次,并尽可能的重复次数多

1.2 组合模式

group_pattern

比如在网站流量场景,通过埋点不断获取到用户在网页的曝光、点击、加购、下单、支付等事件,事件结构定义如下:

{
    "timestamp": 1764315873504,
    "userId": "1",
    "productId": "1",
    "action": "view | click | addCart | order | pay"
}

想检测连续 3 次下单,但都没有支付的数据,向用户发送促销消息。需要匹配 2 个单个模式成为一个组合模式

  • 连续 3 次下单。单个模式,模式因为有量词单例变成循环
  • 支付。单个模式,无量词单例
// 单个模式。连续 3 次下单
Pattern<Event, Event> pattern = Pattern.<Event>begin("下单事件")
        .where(new AviatorCondition<>("action == order"))
        .timesOrMore(3);

// 单个模式。支付
Pattern<Event, Event> pattern = Pattern.<Event>begin("支付事件")
        .where(new AviatorCondition<>("action == pay"));

// 组合模式。连续 3 次下单 --没有---> 支付,连接条件:未发生
Pattern<Event, Event> pattern = Pattern.<Event>begin("下单事件")
        .where(new AviatorCondition<>("action == order"))
        .timesOrMore(3)
        .notFollowedBy("支付事件")
        .where(new AviatorCondition<>("action == pay"));

1.3 模式组

对于模式组是对单个模式组合模式的更进一步结合,比如对检测连续 3 次下单,但都没有支付的数据,发生一次可能是用户没有购买意愿可向用户发送促销消息促成交,发生 10 次判断用户可能是刷单用户。因此需要检测模式整体发生了 10 次:

// 组合模式
Pattern<Event, Event> notPayPattern = Pattern.<Event>begin("下单事件")
        .where(new AviatorCondition<>("action == order"))
        .timesOrMore(3)
        .notFollowedBy("支付事件")
        .where(new AviatorCondition<>("action == pay"));

// 模式组
Pattern<Event, Event> riskPattern = Pattern.begin(notPayPattern)
        .timesOrMore(10);

1.4 连续性

连续性指的是不同事件之间的是否连续,它有 2 个场景:

  • 单个模式中,如果对单例模式增加量词即可变成循环模式。比如检测用户连续转账 5 次且每次转账金额大于 10000 的行为,对于连续转账中的连续的定义:是真正的连续事件,还是转账事件中间可以夹杂着登陆登出事件。
  • 将多个单个模式组合成组合模式。比如连续 3 次下单,但没有支付的行为中,在连续 3 次下单和支付行为中间,是连续的,还是可以夹杂着曝光点击加购等事件。

在模式序列中,把多个单个模式组合成组合模式的条件为连续条件

  • 严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
  • 松散连续: 两个匹配事件的相对顺序,忽略匹配的事件之间的不匹配的事件。
  • 不确定的松散连续: 可以重复使用之前已经匹配过的事件,以同一个事件作为开始匹配,更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。

contiguity

strict_relaxed_contiguity

non-deterministic_relaxed_contiguity

配置方式:

  • 组合模式
    • next()。严格连续
    • followedBy()。松散连续
    • followedByAny()。不确定的松散连续
  • 模式组
    • start.next( Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...) ).times(3)
    • start.followedBy( Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...) ).oneOrMore();
    • start.followedByAny( Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...) ).optional();

同时还可以配置Not语义的连接:

  • notNext(),如果不想后面直接连着一个特定事件
  • notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。

注意,Not语义有使用限制:

  • 如果模式序列没有定义时间约束,则不能以 notFollowedBy() 结尾。
  • 一个Not模式前面不能是可选的模式(有optional()量词)。

对于单例模式,通过增加量词变成循环模式时,重复次数的连续性配置如下:

  • 严格连续。consecutive() oneOrMore()times() 一起使用, 在匹配的事件之间施加严格的连续性, 也就是说,任何不匹配的事件都会终止匹配(和 next() 一样)。
  • 松散连续。默认行为,即followedBy()
  • 不确定性连续。allowCombinations()oneOrMore()times() 一起使用, 在匹配的事件中间施加不确定松散连续性(和 followedByAny() 一样)

1.5 时间约束

除了指定事件的重复次数和不同事件之间的组合,还可以添加时间约束:

  • 循环模式。比如 1 小时内检测用户连续转账 5 次且每次转账金额大于 10000 的行为
  • 组合模式。比如连续 3 次下单,但 10 分钟内没有支付的行为

1.6 复杂条件

单个事件的检测规则可以很简单,也可以很复杂,但是都是针对单个事件,条件也可以跨多个事件。

1.6.1 案例1

比如某日销售额超过前 3 天的平均值 20%,数据流为每日销售额对象。

[
    {
        "date": "20251011",
        "gmv": 100
    },
    {
        "date": "20251012",
        "gmv": 200
    },
    {
        "date": "20251013",
        "gmv": 150
    },
    {
        "date": "20251014",
        "gmv": 600
    }
]

针对上面数据,判断 20251014 销售额是否超过 20251011 + 20251012 + 20251013 平均值 20%。

这种场景下,需获取当日销售额的前 3 天销售额,计算前 3 天销售额的平均值,继而和当天销售额进行比较。

实现方式如下:

Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.noSkip())
        .where(BooleanConditions.trueFunction())
        .times(3)
        .next("avg")
        .where(new IterativeCondition<Event>() {
            @Override
            public boolean filter(Event event, Context<Event> context) throws Exception {
                Iterable<Event> start = context.getEventsForPattern("start");
                Long sum = 0L;
                Long size = 0L;
                for (Event startEvent : start) {
                    sum += startEvent.getGmv();
                    size++;
                }
                Double avg = sum * 1.0 / size;
                double increase = (event.getGmv() - avg) * 1.0 / avg;
                System.out.println("start: " + JacksonUtil.toJsonString(start) + ", event: " + JacksonUtil.toJsonString(event));
                return increase > 0.2;
            }
        })
        .times(1);

因为是计算每日销售额是否大于前 3 天销售额,因此前 3 天的数据是一个滑动窗口效果。

todo:判断 context 是否可以获取正在匹配的模式的数据

Pattern<Event, Event> pattern = Pattern.<Event>begin("gmv")
        .oneOrMore()
        .where(new IterativeCondition<Event>() {
            @Override
            public boolean filter(Event event, Context<Event> context) throws Exception {
                Iterable<Event> events = context.getEventsForPattern("gmv");
                List<Event> list = IteratorUtils.toList(events.iterator());
                if (CollectionUtils.size(list) < 3) {
                    return true;
                }
                if (CollectionUtils.size(list) == 3) {
                    Long avgAmount = calAvg(list);
                    return (event.getAmount() - avgAmount) / avgAmount >= 0.2;
                }
                return false;
            }
        })
        ;

todo: 如何实现滚动

Pattern<Event, Event> pattern = Pattern.<Event>begin("gmv")
        .where(BooleanConditions.trueFunction())
        .times(3)
        .next("higherGmv")
        .where(new IterativeCondition<Event>() {
            @Override
            public boolean filter(Event event, Context<Event> context) throws Exception {
                Iterable<Event> events = context.getEventsForPattern("gmv");
                Long sumAmount = 0L;
                for (Event gmvEvent : events) {
                    sumAmount += gmvEvent.getAmount();
                }
                Long avgAmount = sumAmount / 3;
                return (event.getAmount() - avgAmount) / avgAmount >= 0.2;
            }
        })
        ;

1.6.2 案例2

以监控告警为例:第一次告警发生后,30s 内发生第二次告警,进行告警升级,第一次告警邮件通知,第二次升级为电话通知。

需要配置 2 条规则:

// 只能匹配到 第一次发生报警,在 30s 内发生第二次报警的组合事件。如果发生一次告警,没有在 30s 内发生第二次,告警就会被吞掉,无法进行邮件通知
Pattern<Event, Event> pattern = Pattern.<Event>begin("first alert") // 首次告警
        .where(new AviatorCondition<>("type == alert"))
        .times(1)
        .followedBy("second alert")                                 // 30s再次告警。如果增加 optional() 修饰,会导致告警到达不能第一时间处理,需要等待 30s 查看是否有第二条
        .where(new AviatorCondition<>("type == alert"))
        .within(Time.of(30, TimeUnit.SECONDS))
        ;
// 如果在增加一个模式,则先发生一次告警,30s内发生第二次告警,则两次告警都会被这个检测到,会发 2 次邮件通知,加上 1 次电话通知
Pattern<Event, Event> pattern = Pattern.<Event>begin("alert") // 告警
        .where(new AviatorCondition<>("type == alert"))
        ;

也可以配置 1 条规则:

  • 通过把第二次告警事件的发生次数设置为 optional() 可保证只有一次告警的时候也能发出
  • 因为设置了时间约束,30s 内,可通过超时函数获取到只发生一次告警的事件

缺点是在第一次告警发生后,需等待 30s 查看是否有第二条告警事件,导致第一次告警后不能及时发送告警通知

// 只能匹配到 第一次发生报警,在 30s 内发生第二次报警的组合事件
Pattern<Event, Event> pattern = Pattern.<Event>begin("first alert") // 首次告警
        .where(new AviatorCondition<>("type == alert"))
        .times(1)
        .followedBy("second alert")                                 // 30s再次告警。如果增加 optional() 修饰,会导致告警到达不能第一时间处理,需要等待 30s 查看是否有第二条
        .where(new AviatorCondition<>("type == alert"))
        .optional()
        .within(Time.of(30, TimeUnit.SECONDS))
        ;

比较好的实现是把 30s 的时间间隔放置到发送告警消息时,如果只有 1 条消息,发送邮件通知,如果有 2 条消息,且 2 条消息的时间间隔低于 30 s,发送电话通知,否则发送邮件通知。

// 只能匹配到 第一次发生报警,在 30s 内发生第二次报警的组合事件
Pattern<Event, Event> pattern = Pattern.<Event>begin("first alert") // 首次告警
        .where(new AviatorCondition<>("type == alert"))
        .times(1)
        .followedBy("second alert")                                 // 30s再次告警。如果增加 optional() 修饰,会导致告警到达不能第一时间处理,需要等待 30s 查看是否有第二条
        .where(new AviatorCondition<>("type == alert ")) // todo
        .optional()
        ;

todo

  • 告警静默。1 小时内只发送一次

1.6.3 案例3

多模式的匹配条件

2.匹配后跳过策略

对于一个给定模式 b+ c 和数据流 b1 b2 b3 c,模式 b+ 是 SQL 的语法,表示 1 到多个 b。数据流中有 3 个 b 事件:b1 b2 b3 都会命中 b+ 的规则,那么同一个事件可能会分配到多个成功的匹配上,用户可以指定匹配后跳过策略,控制一个事件会分配到多少个匹配上。

总共有 5 种策略:

  • NO_SKIP。每个成功的匹配都会被输出
  • SKIP_TO_NEXT。丢弃以相同事件开始的所有部分匹配
  • SKIP_PAST_LAST_EVENT。丢弃起始在这个匹配的开始和结束之间的所有部分匹配
  • SKIP_TO_FIRST。丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配
  • SKIP_TO_LAST。丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配

详细说明如下

策略结果描述
NO_SKIPb1 b2 b3 cb2 b3 cb3 c找到匹配b1 b2 b3 c之后,不会丢弃任何结果。
SKIP_TO_NEXTb1 b2 b3 cb2 b3 cb3 c找到匹配b1 b2 b3 c之后,不会丢弃任何结果,因为没有以b1开始的其他匹配。
SKIP_PAST_LAST_EVENTb1 b2 b3 c找到匹配b1 b2 b3 c之后,会丢弃其他所有的部分匹配。
SKIP_TO_FIRST(b)b1 b2 b3 cb2 b3 cb3 c找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b1之前开始的部分匹配,但没有这样的匹配,所以没有任何匹配被丢弃。
SKIP_TO_LAST(b)b1 b2 b3 cb3 c找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b3之前开始的部分匹配,有一个这样的b2 b3 c被丢弃。

详情参考:匹配后跳过策略

3.限制条件

  • 不可逆。
  • 有序。A -> B -> C,A -> C -> B,B -> A -> C,B -> C -> A,C -> A -> B,C -> B -> A