1. 什么是Bloom Filter(布隆过滤器)
一个很长的二进制向量和一系列随机映射函数实现可以用于检索一个元素是否存在一个集合中
1.1 布隆过滤器优点
1.2 布隆过滤器缺点
- 元素添加到集合中后,不能被删除
- 存在一定的误判率,数据越多误判率越高
1.3 布隆过滤器使用场景
1.4 布隆过滤器检索过程
在线使用布隆过滤器
- 要查询一个元素(测试它是否在集合中),请将其提供给k个哈希函数中的每一个以获取k个数组位置。如果这些位置的任何位为 0,则该元素肯定不在集合中;如果是,那么在插入时所有位都将设置为 1。如果全部为 1,则要么元素在集合中,要么在插入其他元素期间这些位偶然设置为 1,从而导致误报。在简单的布隆过滤器中,无法区分这两种情况,但更高级的技术可以解决这个问题。
1.5 布隆过滤器的算法描述
一个空的布隆过滤器是一个由m位组成的位数组,全部设置为 0。还必须定义k个不同的散列函数,每个散列函数将某个集合元素映射或散列到m个数组位置之一,生成均匀随机分布。通常,k是一个小常数,它取决于所需的错误错误率 ε,而m与k和要添加的元素数量 成正比。
要添加一个元素,请将其提供给每个k个哈希函数以获得k个数组位置。将所有这些位置的位设置为 1。
设计k个不同的独立散列函数的要求对于较大的k来说是令人望而却步的。对于具有广泛输出的良好散列函数,这种散列的不同位域之间应该几乎没有相关性,因此这种类型的散列可用于通过将其输出切成多个位来生成多个“不同”散列函数字段。或者,可以将k个不同的初始值(例如 0、1、…、k - 1)传递给采用初始值的散列函数;或将这些值添加(或附加)到键中。对于较大的m和/或k,可以放宽散列函数之间的独立性,而误报率的增加可以忽略不计。
从这个简单的布隆过滤器中删除一个元素是不可能的,因为没有办法知道它映射到的k位中的哪一个应该被清除。尽管将这些k位中的任何一个设置为零就足以删除该元素,但它也会删除碰巧映射到该位上的任何其他元素。由于简单算法无法确定是否添加了影响要删除元素的位的任何其他元素,因此清除任何位将引入假阴性的可能性。
可以通过使用包含已删除项目的第二个 Bloom 过滤器来模拟从 Bloom 过滤器中一次性删除元素。然而,第二个过滤器中的误报在复合过滤器中变成了误报,这可能是不希望的。在这种方法中,重新添加以前删除的项目是不可能的,因为必须将其从“已删除”过滤器中删除。
从算法描述中可以得出 布隆过滤器存在一定的误判率同样布隆过滤器中的元素是不可以被删除的。
2. 布隆过滤器实际应用
2.1 在Java中使用布隆过滤器来进行邮件校验(简单实现)
import java.util.BitSet;
public class Bloom {
private static final int SIZE = 1 << 24;
BitSet bitSet = new BitSet(SIZE);
Hash[] blHash = new Hash[8];
private static final int seeds[] = new int[]{3, 5, 7, 9, 11, 13, 17, 19};
public static void main(String[] args) {
String email = "2633655104@qq.com";
Bloom bloomDemo = new Bloom();
System.out.println(email + "是否在列表中: " + bloomDemo.contains(email));
bloomDemo.add(email);
System.out.println(email + "是否在列表中: " + bloomDemo.contains(email));
email = "2633655104@qq.com";
System.out.println(email + "是否在列表中: " + bloomDemo.contains(email));
}
public Bloom() {
for (int i = 0; i < seeds.length; i++) {
blHash[i] = new Hash(seeds[i]);
}
}
public void add(String str) {
for (Hash hash : blHash) {
bitSet.set(hash.getHash(str), true);
}
}
public boolean contains(String str) {
boolean have = true;
for (Hash hash : blHash) {
have &= bitSet.get(hash.getHash(str));
}
return have;
}
class Hash {
private int seed = 0;
public Hash(int seed) {
this.seed = seed;
}
public int getHash(String string) {
int val = 0;
int len = string.length();
for (int i = 0; i < len; i++) {
val = val * seed + string.charAt(i);
}
return val & (SIZE - 1);
}
}
}
2.2 复杂布隆过滤器实现
package com.xccservice.utils;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicInteger;
public class BloomFilter implements Serializable {
private static final long serialVersionUID = -5221305273707291280L;
private final int[] seeds;
private final int size;
private final BitSet notebook;
private final MisjudgmentRate rate;
private final AtomicInteger useCount = new AtomicInteger(0);
private final Double autoClearRate;
public BloomFilter(int dataCount) {
this(MisjudgmentRate.MIDDLE, dataCount, null);
}
public BloomFilter(MisjudgmentRate rate, int dataCount, Double autoClearRate) {
long bitSize = rate.seeds.length * dataCount;
if (bitSize < 0 || bitSize > Integer.MAX_VALUE) {
throw new RuntimeException("位数太大溢出了,请降低误判率或者降低数据大小");
}
this.rate = rate;
seeds = rate.seeds;
size = (int) bitSize;
notebook = new BitSet(size);
this.autoClearRate = autoClearRate;
}
public void add(String data) {
checkNeedClear();
for (int i = 0; i < seeds.length; i++) {
int index = hash(data, seeds[i]);
setTrue(index);
}
}
public boolean check(String data) {
for (int i = 0; i < seeds.length; i++) {
int index = hash(data, seeds[i]);
if (!notebook.get(index)) {
return false;
}
}
return true;
}
public boolean addIfNotExist(String data) {
checkNeedClear();
int[] dataIndex = new int[seeds.length];
boolean exist = true;
int index;
for (int i = 0; i < seeds.length; i++) {
dataIndex[i] = index = hash(data, seeds[i]);
if (exist) {
if (!notebook.get(index)) {
exist = false;
for (int j = 0; j <= i; j++) {
setTrue(dataIndex[j]);
}
}
} else {
setTrue(index);
}
}
return exist;
}
private void checkNeedClear() {
if (autoClearRate != null) {
if (getUseRate() >= autoClearRate) {
synchronized (this) {
if (getUseRate() >= autoClearRate) {
notebook.clear();
useCount.set(0);
}
}
}
}
}
public void setTrue(int index) {
useCount.incrementAndGet();
notebook.set(index, true);
}
private int hash(String data, int seeds) {
char[] value = data.toCharArray();
int hash = 0;
if (value.length > 0) {
for (int i = 0; i < value.length; i++) {
hash = i * hash + value[i];
}
}
hash = hash * seeds % size;
return Math.abs(hash);
}
public double getUseRate() {
return (double) useCount.intValue() / (double) size;
}
public void saveFilterToFile(String path) {
try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(path))) {
oos.writeObject(this);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static BloomFilter readFilterFromFile(String path) {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(path))) {
return (BloomFilter) ois.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void clear() {
useCount.set(0);
notebook.clear();
}
public MisjudgmentRate getRate() {
return rate;
}
public enum MisjudgmentRate {
VERY_SMALL(new int[]{2, 3, 5, 7}),
SMALL(new int[]{2, 3, 5, 7, 11, 13, 17, 19}),
MIDDLE(new int[]{2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53}),
HIGH(new int[]{2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97,
101, 103, 107, 109, 113, 127, 131});
private int[] seeds;
private MisjudgmentRate(int[] seeds) {
this.seeds = seeds;
}
public int[] getSeeds() {
return seeds;
}
public void setSeeds(int[] seeds) {
this.seeds = seeds;
}
}
public static void main(String[] args) {
BloomFilter bloomFilter = new BloomFilter(7);
System.out.println(bloomFilter.addIfNotExist("1111111111111"));
System.out.println(bloomFilter.check("1111111111111"));
}
}
2.3 Redission 中的分布式布隆过滤器
不论是在 Guava 中,还是自己的简单实现,都只是本地的布隆过滤器,仅仅存在单个应用中,同步起来十分复杂,而且一旦应用重启,则之前添加的元素均丢失,对于分布式环境,可以利用 Redis 构建分布式布隆过滤器
Redisson 框架提供了布隆过滤器的实现
RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
简单分析下 Redission 中源码
通用接口
public interface RBloomFilter<T> extends RExpirable {
boolean add(T object)
boolean contains(T object)
boolean tryInit(long expectedInsertions, double falseProbability);
}
接口实现
public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomFilter<T> {
public boolean add(T object) {
long[] hashes = hash(object);
while (true) {
if (size == 0) {
readConfig();
}
int hashIterations = this.hashIterations;
long size = this.size;
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
for (int i = 0; i < indexes.length; i++) {
bs.setAsync(indexes[i]);
}
try {
List<Boolean> result = (List<Boolean>) executorService.execute();
for (Boolean val : result.subList(1, result.size()-1)) {
if (!val) {
return true;
}
}
return false;
} catch (RedisException e) {
if (!e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
}
}
}
}
}
GET 函数这里就不再深入探讨,只是将 add 函数中的 SET 变成 GET 操作
Hash 函数
private long[] hash(Object object) {
ByteBuf state = encode(object);
try {
return Hash.hash128(state);
} finally {
state.release();
}
}
这个函数将 Object 编码后,返回一个 Byte 数组,然后调用 Hash.hash128 计算哈希值,这里的哈希算法是 HighwayHash
public static long[] hash128(ByteBuf objectState) {
HighwayHash h = calcHash(objectState);
return h.finalize128();
}
protected static HighwayHash calcHash(ByteBuf objectState) {
HighwayHash h = new HighwayHash(KEY);
int i;
int length = objectState.readableBytes();
int offset = objectState.readerIndex();
byte[] data = new byte[32];
for (i = 0; i + 32 <= length; i += 32) {
objectState.getBytes(offset + i, data);
h.updatePacket(data, 0);
}
if ((length & 31) != 0) {
data = new byte[length & 31];
objectState.getBytes(offset + i, data);
h.updateRemainder(data, 0, length & 31);
}
return h;
}
private long[] hash(long hash1, long hash2, int iterations, long size) {
long[] indexes = new long[iterations];
long hash = hash1;
for (int i = 0; i < iterations; i++) {
indexes[i] = (hash & Long.MAX_VALUE) % size;
if (i % 2 == 0) {
hash += hash2;
} else {
hash += hash1;
}
}
return indexes;
}
3. 使用布隆过滤器解决Redis缓存穿透
关于缓存穿透问题可以在之前写的博客如何应对缓存问题查看。解决缓存穿透问题可以使用缓存空对象和布隆过滤器两种方法,这里仅讨论布隆过滤器方法。
使用布隆过滤器逻辑如下:
- 根据 key 查询缓存,如果存在对应的值,直接返回;如果不存在则继续执行
- 根据 key 查询缓存在布隆过滤器的值,如果存在值,则说明该 key 不存在对应的值,直接返回空,如果不存在值,继续向下执行
- 查询 DB 对应的值,如果存在,则更新到缓存,并返回该值,如果不存在值,则更新到布隆过滤器中,并返回空
具体流程图如下所示:
public String getByKey(String key) {
String value = get(key);
if (StringUtils.isEmpty(value)) {
logger.info("Redis 没命中 {}", key);
if (bloomFilter.mightContain(key)) {
logger.info("BloomFilter 命中 {}", key);
return value;
} else {
if (mapDB.containsKey(key)) {
logger.info("更新 Key {} 到 Redis", key);
String valDB = mapDB.get(key);
set(key, valDB);
return valDB;
} else {
logger.info("更新 Key {} 到 BloomFilter", key);
bloomFilter.put(key);
return value;
}
}
} else {
logger.info("Redis 命中 {}", key);
return value;
}
}
|