告警聚合-无向图的动态连通性与union-find算法
[Data Science for Cyber Security]
背景与问题
一次入侵事件分为多个阶段,过程中攻击者会执行很多行为,其中一部分行为会被IDS产品捕获,形成告警,我们需要对同一次入侵事件产生的告警进行聚合,还原入侵事件的原貌,从而便于后续自动化防御工作的部署。
在入侵溯源功能中,我们已经将攻击者的行为(白色节点)和触发的告警(橙色节点)进行关联分析并构成了无环图。在图中所有联通的橙色节点需要划分到同一入侵事件group中。
目前需要对这份数据做一个接口:
- 数据:点(A)和边(A->B)的数据异步输入
- API input: 某个告警id
- API output: 与其联通的其他告警id
起初方案是在图数据库实现了图查询,算法同学给出了更优方案:
动态连通性(Dynamic Connectivit)
动态连通性是计算机图论中的一种数据结构,动态维护图结构中相连接的组信息。 简单的说就是,图中各个点之间是否相连、连接后组成了多少个组等信息。我们称连接在一起就像形成了一个圈子似的,成为一个组(Component),每个组有其自己的一些特征,比如组内所有成员都有同一个标记等。
提到圈子,大家比较好理解,我们在社交网络中,彼此熟悉的人之间组成自己的圈子,"熟悉"用计算机中的语言来表示就是“Connected 连通的。圈子是会变化的,今天你又新认识了某人,明天你跟某人友尽了,这种变化是动态的,所以有了动态连通性这种数据结构和问题。
比较常见的应用有,社交网络中比如LinkedIn, 判断某个用户与其它用户是否熟悉:如果你与用户A熟悉,用户A与用户B熟悉,则认为你与用户B也是连接的,你可以看到用户B的信息。在计算机网络中,也存在类似的情况,判断网络中某两个节点是否相连。
Union-Find 算法API
实现有三种
quick-find
Python实现
# !/usr/bin/env python
# -*- coding: utf-8 -*-
class QuickFind(object):
id = []
count = 0
def __init__(self, n):
self.count = n # 点初始化 group=id
i = 0
while i < n:
self.id.append(i)
i += 1
def connected(self, p, q):
return self.find(p) == self.find(q)
def find(self, p):
return self.id[p]
def union(self, p, q):
idp = self.find(p)
if not self.connected(p, q):
for i in range(len(self.id)):
if self.id[i] == idp: # 将p所在组内的所有节点的id都设为q的当前id
self.id[i] = self.id[q]
self.count -= 1
# 点初始化
q = QuickFind(10)
# 边信息
edges = [
(1, 2),
(1, 3)
]
# 边输入
for edge in edges:
q.union(edge[0], edge[1])
print(q.id)
# [0, 3, 3, 3, 4, 5, 6, 7, 8, 9]
quick-find 顾名思议,该算法将每个节点的分组结果存储为一列,因此find()函数可以直接按id索引查出所在分组,find()函数只访问数组一次,union()函数在(N+3)到(2N+1)之间,而算法整体复杂度是(N+3)*(N-1)~N^2
平方级别的(N个点之中最多有N-1次union操作)。
quick-union
对于大规模的数据而言,平方阶的算法是存在问题的,这种情况下,每次添加新路径就是"牵一发而动全身,想要解决这个问题,关键就是要提高union方法的效率,让它不再需要遍历整个数组。
一个思路是将id数据中存储的group信息改为链接,链接构成一个树结构,树的根节点(唯一)即为分组结果,而在find操作时候再根据链接指向做树的递归遍历。
Python实现
class QuickUnion(object):
id = []
count = 0
def __init__(self, n): # 初始化节点id列,group=id自身
self.count = n
i = 0
while i < n:
self.id.append(i)
i += 1
def connected(self, p, q):
return bool(self.find(p) == self.find(q))
def find(self, p): # 递归查树
while (p != self.id[p]):
p = self.id[p]
return p
def union(self, p, q): # 将p和q的根节点统一
idq = self.find(q)
idp = self.find(p)
if not self.connected(p, q):
self.id[idp] = idq # 没有定义方向
self.count -= 1
在此算法中,考虑find()性能在最坏情况下需要2N-1次数组访问,因此在find()接口被频繁调用时仍然是N^2平方阶的复杂度。与之前的quick-find比较需要看数据集的特征。
weighted quick-union
在quick-union中存在一个问题:self.id[idp] = idq
没有定义方向,这直接影响了树的深度,改进的方法是在union里面判断两个树的大小(节点数量),将小的树附加到大的树上,这样,合并后的树的深度不会变得非常大。
class WeightedQuickUnion():
id = [] # 节点的父链接数组
count = 0 # 最终分组数量
sz = [] # 树的size,以根节点作为索引,用于比对大树还是小树
def __init__(self, n):
self.count = n
i = 0
while i < n:
self.id.append(i)
self.sz.append(1) # 树的size初始化为1
i += 1
def connected(self, p, q):
return bool(self.find(p) == self.find(q))
def find(self, p):
while (p != self.id[p]):
p = self.id[p]
return p
def union(self, p, q):
idp = self.find(p)
idq = self.find(q)
if not self.connected(p, q): # 已有相同根节点,忽略
if (self.sz[idp] < self.sz[idq]):
# 小树挂在大树下面
self.id[idp] = idq
# 更新树的size
self.sz[idq] += self.sz[idp]
else:
self.id[idq] = idp
self.sz[idp] += self.sz[idq]
self.count -= 1
另外一种方法是记录树的高度(rank)按照低的树向高的树合并,两者性能差异看具体树结构。
路径压缩
根据上面weighted quick union的结构图来看,还是存在深度>1的树结构,这个从叶子节点到根节点的"溯源"过程可以优化。一个最直接的办法是,在find()执行时将这条路径上的所有中间节点记录下来,使其全部挂在根节点下面。但是这样一来会增加算法的空间复杂度(估计在某些acm题目里会爆栈)。一个备选的思路是每遍历到一个节点,就将这个节点变成他的爷爷节点的孩子(和其父节点在同一层)。相当于是压缩了查询的路径,如此,频繁的查询会导致树逐渐扁平化,使后续find()的复杂度降低。
def find(self, p):
while (p != self.id[p]):
self.id[p] = self.id[self.id[p]] # p节点的父节点置为它的爷爷节点
p = self.id[p]
return p
对比
存在N个节点时计算复杂度(最坏情况下)
algorithm | union() | find() |
---|---|---|
graph-based query | N | 1 |
quick-find | N | 1 |
quick-union | tree hight | tree hight |
weighted quick-union | tree hight | tree hight |
weighted quick-union with path compression | lg*N ~1 | lg*N ~1 |
log*N = 0 if N <= 1 else 1+log*(logn)
近似为O(1)
Ref
- @相旬 @刈刀
- https://www.jianshu.com/p/a99d05d2ce2e
- https://blog.csdn.net/johnny901114/article/details/80721436
- 《算法 第四版》1.5 案例研究:union-find算法