基于StackExchange.Redis的分布式锁
直入主题
以IDatabase扩展方法的形式实现分布式锁方法,将代码拷到任意静态类里即可使用。
/// <summary>
/// 使用Redis分布式锁执行某些操作
/// </summary>
/// <param name="lockName">锁名</param>
/// <param name="act">操作</param>
/// <param name="expiry">锁过期时间,若超出时间自动解锁 单位:sec</param>
/// <param name="retry">获取锁的重复次数</param>
/// <param name="tryDelay">获取锁的重试间隔 单位:ms</param>
public static void LockAction(this IDatabase db, string lockName, Action act, int expiry = 10, int retry = 3, int tryDelay = 200)
{
if (act.Method.IsDefined(typeof(AsyncStateMachineAttribute), false))
{
throw new ArgumentException("使用异步Action请调用LockActionAsync");
}
TimeSpan exp = TimeSpan.FromSeconds(expiry);
string token = Guid.NewGuid().ToString("N");
try
{
bool ok = false;
// 延迟重试
for (int test = 0; test < retry; test++)
{
if (db.LockTake(lockName, token, exp))
{
ok = true;
break;
}
else
{
Task.Delay(tryDelay).Wait();
}
}
if (!ok)
{
throw new InvalidOperationException($"获取锁[{lockName}]失败");
}
act();
}
finally
{
db.LockRelease(lockName, token);
}
}
/// <summary>
/// 使用Redis分布式锁执行某些异步操作
/// </summary>
/// <param name="lockName">锁名</param>
/// <param name="act">操作</param>
/// <param name="expiry">锁过期时间,若超出时间自动解锁 单位:sec</param>
/// <param name="retry">获取锁的重复次数</param>
/// <param name="tryDelay">获取锁的重试间隔 单位:ms</param>
public static async Task LockActionAsync(this IDatabase db, string lockName, Func<Task> act, int expiry = 10, int retry = 3, int tryDelay = 200)
{
TimeSpan exp = TimeSpan.FromSeconds(expiry);
string token = Guid.NewGuid().ToString("N");
try
{
bool ok = false;
// 延迟重试
for (int test = 0; test < retry; test++)
{
if (await db.LockTakeAsync(lockName, token, exp))
{
ok = true;
break;
}
else
{
await Task.Delay(tryDelay);
}
}
if (!ok)
{
throw new InvalidOperationException($"获取锁[{lockName}]失败");
}
await act();
}
finally
{
await db.LockReleaseAsync(lockName, token);
}
}
使用方法
private async Task RedisLockTestAsync()
{
string connStr = "Redis连接字符串";
var conn = await ConnectionMultiplexer.ConnectAsync(connStr);
IDatabase db = conn.GetDatabase();
// 带异步操作的用 LockActionAsync
await db.LockActionAsync("MyLockName", async () =>
{
// 执行异步方法...
await Task.Delay(1000);
Console.WriteLine("Done");
});
}
private void RedisLockTest()
{
string connStr = "Redis连接字符串";
var conn = ConnectionMultiplexer.Connect(connStr);
IDatabase db = conn.GetDatabase();
// 同步操作的用 LockAction
db.LockAction("MyLockName", () =>
{
// 同步方法...
Task.Delay(1000).Wait();
Console.WriteLine("Done");
});
}
一些思考
分布式锁使用的场景
最近在Redis的使用中遇到了需要在List中查找某个值,若不存在则向List中新增,即需要维护一个没有重复项的List的需求。虽然Redis本身的所有操作都是互斥的,但是Redis本身并没有提供无重复项的List类型,也没有相关指令能对List进行去重,故需要拉整个List下来进行对比再写入,在这里就会存在一个并发的问题,若在分布式部署且高并发的情景里就有可能使List出现重复项。所以需要对这个操作加上分布式锁,类似的场景还有很多这里不一一列举。
Redis里分布式锁的实现
所谓分布式锁实际上就是一个Redis里的一个键值对,锁名为Key,占用者(Token)为value。当A需要占用时则将value修改为A的Token并返回True,当B再想占用时,发现这个Key(锁名)的value已经有值且不是自己的Token,此时就返回False。这部分的逻辑StackExchange.Redis都已经帮我们做了。我们只需要调用它的LockTake,LockRelease方法即可。
因为获取锁的时候锁可能已经被占用,所以需要有一个重试机制,有重试就应该有重试间隔,这就是为什么LockAction方法里会有retry和tryDelay参数。
为什么异步和同步的方法要分开
刚开始是想做成像 Task.Run(()=>{}) 那样既可以传异步Action也可以传同步Action,但是后面发现.net更新之后异步Action没办法像以前那样用BeginInvoke方法等待执行完成,本人技术有限,实在找不到办法兼容同步和异步Action,这个是客观的原因。
另外,主观上我觉得兼容同步和异步就是一件不值得提倡的事情,若该扩展方法设计为同步方法,那在同步方法里执行异步操作再强行wait它是一件很瓜皮的事,而且还可能会出现莫名其妙的bug;若该扩展方法设计为异步方法但是如果传进来是同步Action的话,那么一个异步操作都没有,虽然没问题但很别扭,所以这里最终还是把同步操作和异步操作分开写成了两个方法。
|