IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> C#中基于StackExchange.Redis的分布式锁 -> 正文阅读

[大数据]C#中基于StackExchange.Redis的分布式锁

直入主题

以IDatabase扩展方法的形式实现分布式锁方法,将代码拷到任意静态类里即可使用。

/// <summary>
/// 使用Redis分布式锁执行某些操作
/// </summary>
/// <param name="lockName">锁名</param>
/// <param name="act">操作</param>
/// <param name="expiry">锁过期时间,若超出时间自动解锁 单位:sec</param>
/// <param name="retry">获取锁的重复次数</param>
/// <param name="tryDelay">获取锁的重试间隔  单位:ms</param>
public static void LockAction(this IDatabase db, string lockName, Action act, int expiry = 10, int retry = 3, int tryDelay = 200)
{
    if (act.Method.IsDefined(typeof(AsyncStateMachineAttribute), false))
    {
        throw new ArgumentException("使用异步Action请调用LockActionAsync");
    }

    TimeSpan exp = TimeSpan.FromSeconds(expiry);
    string token = Guid.NewGuid().ToString("N");
    try
    {
        bool ok = false;
        // 延迟重试
        for (int test = 0; test < retry; test++)
        {
            if (db.LockTake(lockName, token, exp))
            {
                ok = true;
                break;
            }
            else
            {
                Task.Delay(tryDelay).Wait();
            }
        }
        if (!ok)
        {
            throw new InvalidOperationException($"获取锁[{lockName}]失败");
        }
        act();
    }
    finally
    {
        db.LockRelease(lockName, token);
    }
}

/// <summary>
/// 使用Redis分布式锁执行某些异步操作
/// </summary>
/// <param name="lockName">锁名</param>
/// <param name="act">操作</param>
/// <param name="expiry">锁过期时间,若超出时间自动解锁 单位:sec</param>
/// <param name="retry">获取锁的重复次数</param>
/// <param name="tryDelay">获取锁的重试间隔  单位:ms</param>
public static async Task LockActionAsync(this IDatabase db, string lockName, Func<Task> act, int expiry = 10, int retry = 3, int tryDelay = 200)
{
    TimeSpan exp = TimeSpan.FromSeconds(expiry);
    string token = Guid.NewGuid().ToString("N");
    try
    {
        bool ok = false;
        // 延迟重试
        for (int test = 0; test < retry; test++)
        {
            if (await db.LockTakeAsync(lockName, token, exp))
            {
                ok = true;
                break;
            }
            else
            {
                await Task.Delay(tryDelay);
            }
        }
        if (!ok)
        {
            throw new InvalidOperationException($"获取锁[{lockName}]失败");
        }
        await act();
    }
    finally
    {
        await db.LockReleaseAsync(lockName, token);
    }
}

使用方法

private async Task RedisLockTestAsync()
{
    string connStr = "Redis连接字符串";
    var conn = await ConnectionMultiplexer.ConnectAsync(connStr);
    IDatabase db = conn.GetDatabase();
    // 带异步操作的用 LockActionAsync
    await db.LockActionAsync("MyLockName", async () =>
    {
        // 执行异步方法...
        await Task.Delay(1000);
        Console.WriteLine("Done");
    });
}

private void RedisLockTest()
{
    string connStr = "Redis连接字符串";
    var conn = ConnectionMultiplexer.Connect(connStr);
    IDatabase db = conn.GetDatabase();
    // 同步操作的用 LockAction
    db.LockAction("MyLockName", () =>
    {
        // 同步方法...
        Task.Delay(1000).Wait();
        Console.WriteLine("Done");
    });
}

一些思考

分布式锁使用的场景

最近在Redis的使用中遇到了需要在List中查找某个值,若不存在则向List中新增,即需要维护一个没有重复项的List的需求。虽然Redis本身的所有操作都是互斥的,但是Redis本身并没有提供无重复项的List类型,也没有相关指令能对List进行去重,故需要拉整个List下来进行对比再写入,在这里就会存在一个并发的问题,若在分布式部署且高并发的情景里就有可能使List出现重复项。所以需要对这个操作加上分布式锁,类似的场景还有很多这里不一一列举。

Redis里分布式锁的实现

所谓分布式锁实际上就是一个Redis里的一个键值对,锁名为Key,占用者(Token)为value。当A需要占用时则将value修改为A的Token并返回True,当B再想占用时,发现这个Key(锁名)的value已经有值且不是自己的Token,此时就返回False。这部分的逻辑StackExchange.Redis都已经帮我们做了。我们只需要调用它的LockTake,LockRelease方法即可。

因为获取锁的时候锁可能已经被占用,所以需要有一个重试机制,有重试就应该有重试间隔,这就是为什么LockAction方法里会有retry和tryDelay参数。

为什么异步和同步的方法要分开

刚开始是想做成像 Task.Run(()=>{}) 那样既可以传异步Action也可以传同步Action,但是后面发现.net更新之后异步Action没办法像以前那样用BeginInvoke方法等待执行完成,本人技术有限,实在找不到办法兼容同步和异步Action,这个是客观的原因。

另外,主观上我觉得兼容同步和异步就是一件不值得提倡的事情,若该扩展方法设计为同步方法,那在同步方法里执行异步操作再强行wait它是一件很瓜皮的事,而且还可能会出现莫名其妙的bug;若该扩展方法设计为异步方法但是如果传进来是同步Action的话,那么一个异步操作都没有,虽然没问题但很别扭,所以这里最终还是把同步操作和异步操作分开写成了两个方法。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-20 23:03:25  更:2022-06-20 23:05:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:43:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码