简介
Aho–Corasick算法是由Alfred V. Aho和Margaret J.Corasick 发明的字符串搜索算法,用于在输入的一串字符串中匹配有限组“字典”中的子串。该算法主要依靠构造一个有限状态机(类似于在一个trie树中添加失配指针)来实现。这些额外的失配指针允许在查找字符串失败时进行回退(例如设Trie树的单词cat匹配失败,但是在Trie树中存在另一个单词cart,失配指针就会指向前缀ca),转向某前缀的其他分支,免于重复匹配前缀,提高算法效率。 以规则串[‘he’,‘she’,‘his’,‘hers’]为例,构建的trie树如下,转移函数需要表示状态和对应字符的转移过程(黑色箭头)。 加上失败函数和输出函数的trie树如下图,失败函数(黄色箭头)表示在该状态下匹配字符失败时应该转移的过程(存在部分的相同前缀),输出函数(蓝色字体)表示该状态对应的前缀存在规则(可能不止一个)。 因此,Aho–Corasick算法主要包括构造trie树,转向函数、失效函数和输出函数四个部分, 使用python实现Aho–Corasic算法。
数据结构
首先定义数据结构,结点包括状态、字符和子结点列表,AC算法包括一个trie树,转向函数、失效函数和输出函数(使用字典实现)。
class Node:
def __init__(self, state_num, char=None):
self.state_num = state_num
self.char = char
self.children_node = []
class AC:
def __init__(self, patterns):
self.goto_dic = defaultdict(lambda: -1)
self.failure_dic = defaultdict(int)
self.output_dic = defaultdict(list)
self._state_num_max = 0
self.trie = Node(0)
self.build_trie(patterns)
构造trie树
将规则逐个添加到trie树中,判断每个字符是否存在(进入字典的下一层),存在则转到该状态,不存在则添加一个新节点同时修改转向函数。一个规则的所有字符添加到tire树后,修改输出函数。
def build_pattern(self, pattern):
"""
将规则添加到trie树中,判断每个字符是否存在(进入字典的下一层),存在则转到该状态,不存在则添加一个新节点同时修改转向函数。
"""
current = self.trie
for ch in pattern:
index = self.exist_children(current, ch)
if index == -1:
current = self.add_node(current, ch)
else:
current = current.children_node[index]
self.output_dic[current.state_num] = [pattern]
失败函数
全部规则构成trie树后,构造失败函数,过程为一个层序遍历,根据上一层的失败函数构造下一层的失败函数,如果构造成功且转移的失败状态不为根节点,证明该模式串存在子串,因此需要修改输出函数。
def build_fail(self):
node_at_level = self.trie.children_node
while node_at_level:
node_at_next_level = []
for parent in node_at_level:
node_at_next_level.extend(parent.children_node)
for child in parent.children_node:
v = self.failure_dic[parent.state_num]
while self.goto_dic[(v, child.char)] == -1 and v != 0:
v = self.failure_dic[v]
fail_value = self.goto_dic[(v, child.char)]
self.failure_dic[child.state_num] = fail_value
if self.failure_dic[child.state_num] != 0:
self.output_dic[child.state_num].extend(self.output_dic[fail_value])
node_at_level = node_at_next_level
搜索过程
对于主串的每个字符,如果与trie树匹配成功,使用转移函数跳转,否则使用失败函数跳转,通过输出函数,对子串进行保存。
def search(self, text):
current_state = 0
ch_index = 0
key_list = []
while ch_index < len(text):
ch = text[ch_index]
if self.goto(current_state, ch) == -1:
current_state = self.failure_dic[current_state]
current_state = self.goto(current_state, ch)
patterns = self.output_dic[current_state]
if patterns:
key_list.extend(patterns)
ch_index += 1
if len(key_list) != 0:
print("主串为: {}, 包含关键词为: {}".format(text, key_list))
实验结果
规则数据为精确串规则url_2w.txt,数据为url_data.txt, 19956条规则构建trie树花费时间:1.23s 999947条字符串匹配字符串, 总耗时: 27.01s, 平均匹配一条模式串耗时: 2.70e-05s
全部代码如下:
from collections import defaultdict
import time
class Node:
def __init__(self, state_num, char=None):
self.state_num = state_num
self.char = char
self.children_node = []
class AC:
def __init__(self, patterns):
self.goto_dic = defaultdict(lambda: -1)
self.failure_dic = defaultdict(int)
self.output_dic = defaultdict(list)
self._state_num_max = 0
self.trie = Node(0)
self.build_trie(patterns)
def build_trie(self, patterns):
for pattern in patterns:
self.build_pattern(pattern)
self.build_fail()
def build_pattern(self, pattern):
"""
将规则添加到trie树中,判断每个字符是否存在(进入字典的下一层),存在则转到该状态,不存在则添加一个新节点同时修改转向函数。
"""
current = self.trie
for ch in pattern:
index = self.exist_children(current, ch)
if index == -1:
current = self.add_node(current, ch)
else:
current = current.children_node[index]
self.output_dic[current.state_num] = [pattern]
def exist_children(self, current, char):
for index in range(len(current.children_node)):
child = current.children_node[index]
if child.char == char:
return index
return -1
def add_node(self, current, ch):
self._state_num_max += 1
next_node = Node(self._state_num_max, ch)
current.children_node.append(next_node)
self.goto_dic[(current.state_num, ch)] = self._state_num_max
return next_node
def build_fail(self):
node_at_level = self.trie.children_node
while node_at_level:
node_at_next_level = []
for parent in node_at_level:
node_at_next_level.extend(parent.children_node)
for child in parent.children_node:
v = self.failure_dic[parent.state_num]
while self.goto_dic[(v, child.char)] == -1 and v != 0:
v = self.failure_dic[v]
fail_value = self.goto_dic[(v, child.char)]
self.failure_dic[child.state_num] = fail_value
if self.failure_dic[child.state_num] != 0:
self.output_dic[child.state_num].extend(self.output_dic[fail_value])
node_at_level = node_at_next_level
def goto(self, s, char):
if s == 0:
if (s, char) not in self.goto_dic:
return 0
return self.goto_dic[(s, char)]
def search(self, text):
current_state = 0
ch_index = 0
key_list = []
while ch_index < len(text):
ch = text[ch_index]
if self.goto(current_state, ch) == -1:
current_state = self.failure_dic[current_state]
current_state = self.goto(current_state, ch)
patterns = self.output_dic[current_state]
if patterns:
key_list.extend(patterns)
ch_index += 1
if len(key_list) != 0:
print("主串为: {}, 包含关键词为: {}".format(text, key_list))
if __name__ == "__main__":
rule =[]
with open("../测试规则与测试数据/规则/精确串规则/url_2w.txt", "r") as f:
for line in f:
rule.append(line.replace("\n", "").strip())
start_time = time.time()
ac = AC(rule)
print("{}条规则构建trie树花费时间为:{}s".format(len(rule),time.time()-start_time))
start_search = time.time()
data_num = 0
with open("../测试规则与测试数据/数据/url_data.txt", "rb") as f:
for line in f:
try:
line = line.decode("utf-8")
ac.search(line.replace("\n","").strip())
data_num += 1
except Exception as e:
print(e)
print("{}条字符串匹配字符串, 总耗时为: {}s, 平均匹配一条模式串耗时为: {}s".format(data_num, time.time() - start_search,(time.time() - start_search)/data_num))
参考链接:
- https://zh.m.wikipedia.org/zh-hans/AC%E8%87%AA%E5%8A%A8%E6%9C%BA%E7%AE%97%E6%B3%95
- Aho, Alfred V.; Corasick, Margaret J. Efficient string matching: An aid to bibliographic search. Communications of the ACM. June 1975, 18 (6): 333–340. MR?0371172.doi:10.1145/360825.360855
- https://www.cnblogs.com/super-zhang-828/p/6193684.html
|