cdxy.me
Cyber Security / Data Science / Trading

背景与问题

一次入侵事件分为多个阶段,过程中攻击者会执行很多行为,其中一部分行为会被IDS产品捕获,形成告警,我们需要对同一次入侵事件产生的告警进行聚合,还原入侵事件的原貌,从而便于后续自动化防御工作的部署。

在入侵溯源功能中,我们已经将攻击者的行为(白色节点)和触发的告警(橙色节点)进行关联分析并构成了无环图。在图中所有联通的橙色节点需要划分到同一入侵事件group中。

目前需要对这份数据做一个接口:

  • 数据:点(A)和边(A->B)的数据异步输入
  • API input: 某个告警id
  • API output: 与其联通的其他告警id

起初方案是在图数据库实现了图查询,算法同学给出了更优方案:

动态连通性(Dynamic Connectivit)

动态连通性是计算机图论中的一种数据结构,动态维护图结构中相连接的组信息。 简单的说就是,图中各个点之间是否相连、连接后组成了多少个组等信息。我们称连接在一起就像形成了一个圈子似的,成为一个组(Component),每个组有其自己的一些特征,比如组内所有成员都有同一个标记等。

提到圈子,大家比较好理解,我们在社交网络中,彼此熟悉的人之间组成自己的圈子,"熟悉"用计算机中的语言来表示就是“Connected 连通的。圈子是会变化的,今天你又新认识了某人,明天你跟某人友尽了,这种变化是动态的,所以有了动态连通性这种数据结构和问题。

比较常见的应用有,社交网络中比如LinkedIn, 判断某个用户与其它用户是否熟悉:如果你与用户A熟悉,用户A与用户B熟悉,则认为你与用户B也是连接的,你可以看到用户B的信息。在计算机网络中,也存在类似的情况,判断网络中某两个节点是否相连。

Union-Find 算法API

实现有三种

quick-find

Python实现

# !/usr/bin/env python
#  -*- coding: utf-8 -*-

class QuickFind(object):
    id = []
    count = 0

    def __init__(self, n):
        self.count = n # 点初始化 group=id
        i = 0
        while i < n:
            self.id.append(i)
            i += 1

    def connected(self, p, q):
        return self.find(p) == self.find(q)

    def find(self, p):
        return self.id[p]

    def union(self, p, q):
        idp = self.find(p)
        if not self.connected(p, q):
            for i in range(len(self.id)):
                if self.id[i] == idp:  # 将p所在组内的所有节点的id都设为q的当前id
                    self.id[i] = self.id[q]
            self.count -= 1

# 点初始化
q = QuickFind(10)

# 边信息
edges = [
    (1, 2),
    (1, 3)
]

# 边输入
for edge in edges:
    q.union(edge[0], edge[1])

print(q.id)
# [0, 3, 3, 3, 4, 5, 6, 7, 8, 9]

quick-find 顾名思议,该算法将每个节点的分组结果存储为一列,因此find()函数可以直接按id索引查出所在分组,find()函数只访问数组一次,union()函数在(N+3)到(2N+1)之间,而算法整体复杂度是(N+3)*(N-1)~N^2平方级别的(N个点之中最多有N-1次union操作)。

quick-union

对于大规模的数据而言,平方阶的算法是存在问题的,这种情况下,每次添加新路径就是"牵一发而动全身,想要解决这个问题,关键就是要提高union方法的效率,让它不再需要遍历整个数组。

一个思路是将id数据中存储的group信息改为链接,链接构成一个树结构,树的根节点(唯一)即为分组结果,而在find操作时候再根据链接指向做树的递归遍历。

Python实现

class QuickUnion(object):
    id = []
    count = 0

    def __init__(self, n):  # 初始化节点id列,group=id自身
        self.count = n
        i = 0
        while i < n:
            self.id.append(i)
            i += 1

    def connected(self, p, q):
        return bool(self.find(p) == self.find(q))

    def find(self, p): # 递归查树
        while (p != self.id[p]):
            p = self.id[p]
        return p

    def union(self, p, q): # 将p和q的根节点统一
        idq = self.find(q)
        idp = self.find(p)
        if not self.connected(p, q):
            self.id[idp] = idq # 没有定义方向
            self.count -= 1

在此算法中,考虑find()性能在最坏情况下需要2N-1次数组访问,因此在find()接口被频繁调用时仍然是N^2平方阶的复杂度。与之前的quick-find比较需要看数据集的特征。

weighted quick-union

在quick-union中存在一个问题:self.id[idp] = idq没有定义方向,这直接影响了树的深度,改进的方法是在union里面判断两个树的大小(节点数量),将小的树附加到大的树上,这样,合并后的树的深度不会变得非常大。

class WeightedQuickUnion():
    id = []  # 节点的父链接数组
    count = 0  # 最终分组数量
    sz = []  # 树的size,以根节点作为索引,用于比对大树还是小树

    def __init__(self, n):
        self.count = n
        i = 0
        while i < n:
            self.id.append(i)
            self.sz.append(1)  # 树的size初始化为1
            i += 1

    def connected(self, p, q):
        return bool(self.find(p) == self.find(q))

    def find(self, p):
        while (p != self.id[p]):
            p = self.id[p]
        return p

    def union(self, p, q):
        idp = self.find(p)
        idq = self.find(q)

        if not self.connected(p, q):  # 已有相同根节点,忽略

            if (self.sz[idp] < self.sz[idq]):
                # 小树挂在大树下面
                self.id[idp] = idq
                # 更新树的size
                self.sz[idq] += self.sz[idp]
            else:
                self.id[idq] = idp
                self.sz[idp] += self.sz[idq]

            self.count -= 1

另外一种方法是记录树的高度(rank)按照低的树向高的树合并,两者性能差异看具体树结构。

路径压缩

根据上面weighted quick union的结构图来看,还是存在深度>1的树结构,这个从叶子节点到根节点的"溯源"过程可以优化。一个最直接的办法是,在find()执行时将这条路径上的所有中间节点记录下来,使其全部挂在根节点下面。但是这样一来会增加算法的空间复杂度(估计在某些acm题目里会爆栈)。一个备选的思路是每遍历到一个节点,就将这个节点变成他的爷爷节点的孩子(和其父节点在同一层)。相当于是压缩了查询的路径,如此,频繁的查询会导致树逐渐扁平化,使后续find()的复杂度降低。

    def find(self, p):
        while (p != self.id[p]):
            self.id[p] = self.id[self.id[p]] # p节点的父节点置为它的爷爷节点
            p = self.id[p]
        return p

对比

存在N个节点时计算复杂度(最坏情况下)

algorithm union() find()
graph-based query N 1
quick-find N 1
quick-union tree hight tree hight
weighted quick-union tree hight tree hight
weighted quick-union with path compression lg*N ~1 lg*N ~1

log*N = 0 if N <= 1 else 1+log*(logn) 近似为O(1)

Ref

  • @相旬 @刈刀
  • https://www.jianshu.com/p/a99d05d2ce2e
  • https://blog.csdn.net/johnny901114/article/details/80721436
  • 《算法 第四版》1.5 案例研究:union-find算法