IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> Jedis - SharedJedisPool 初始化与应用 & hash 算法详解 -> 正文阅读

[数据结构与算法]Jedis - SharedJedisPool 初始化与应用 & hash 算法详解

一.引言

使用?SharedJedisPool 时注意到内部涉及到 hash 函数,其中对应的 hash 接口需要复写两个 hash 函数分别是 hash (String var1) 和 hash (Byte[] var1),默认使用?Hashing.MURMUR_HASH 算法,除此之外也可以使用自带的 MD5,下面针对 SharedJedisPool 以及两个 Hash 函数的使用和含义进行分解。

public interface Hashing {
    Hashing MURMUR_HASH = new MurmurHash();
    ThreadLocal<MessageDigest> md5Holder = new ThreadLocal();
    Hashing MD5 = new Hashing() {
        public long hash(String key) {
            return this.hash(SafeEncoder.encode(key));
        }

        public long hash(byte[] key) {
            try {
                if (md5Holder.get() == null) {
                    md5Holder.set(MessageDigest.getInstance("MD5"));
                }
            } catch (NoSuchAlgorithmException var6) {
                throw new IllegalStateException("++++ no md5 algorythm found");
            }

            MessageDigest md5 = (MessageDigest)md5Holder.get();
            md5.reset();
            md5.update(key);
            byte[] bKey = md5.digest();
            long res = (long)(bKey[3] & 255) << 24 | (long)(bKey[2] & 255) << 16 | (long)(bKey[1] & 255) << 8 | (long)(bKey[0] & 255);
            return res;
        }
    };

    long hash(String var1);

    long hash(byte[] var1);
}

二.SharedJedisPool?

1.初始化源代码

最常见的初始化方法就是传入 poolConfig 以及对应的 List<JedisShardInfo> 即需要绑定的 redis 池的 host 和 port,也可以自定义 Hashing 类即 algo 参数,这里 Hashing 默认使用 MURMUR_HASH,如果需要自定义则需要实现 hash(String) 和 hash(Byte[]) 两个 hash 函数,也就是上面提到的两个不同分工的 hash 函数。?

2. JedisPool 初始化示例

SharedJedisPool 的初始化与 Jedis 不同,由于是连接池,所以涉及到资源的连接与释放,连接池的大小等,这些统一配置到 JedisPoolConfig 中,其次就是选择要绑定的 redis 集合,将 host-port 一次添加至 List 中,为了区分读写任务,这里通过 rm 和 rs 对 redis host 进行了区分,最后初始化 SharedJedisPool 即可。

  def getSharedJedisPool(hostAndPorts: Array[(String, String)], isRead: Boolean): ShardedJedisPool = {
    val jedisShardInfoList = new util.ArrayList[JedisShardInfo]()

    val config = new JedisPoolConfig
    config.setMaxTotal(30)
    config.setMaxIdle(20)
    config.setMinIdle(20)
    config.setTimeBetweenEvictionRunsMillis(30000)
    config.setSoftMinEvictableIdleTimeMillis(3600000)
    config.setTestOnBorrow(true)
    config.setTestOnReturn(true)
    config.setTestWhileIdle(true)

    hostAndPorts.foreach{ case (host, port) =>
      if (isRead && host.contains("rs")) {
        jedisShardInfoList.add(new JedisShardInfo(host, port.toInt))
      } else if (!isRead && host.contains("rm")) {
        jedisShardInfoList.add(new JedisShardInfo(host, port.toInt))
      } else {
        println("主从与任务不匹配!")
      }
    }


    val shardedJedisPool = new ShardedJedisPool(config, jedisShardInfoList, Hashing.MURMUR_HASH, Sharded.DEFAULT_KEY_TAG_PATTERN)
    shardedJedisPool

  }

3.JedisPool 常见使用方法

一般 JedisPool 的使用遵循 Try-Catch-Finaly 的原则。

Try:

相比 Jedis,sharedJedis 有一些操作不支持,例如 mset,mget 等,可以理解为 SharedJedis 只能处理单 key 的情况,因为涉及到 hash 到不同的 redis 上,所以这些操作都不被允许。

Catch:

由于是连接池,就会存在连接失效,连接超时,连接不够的情况,所以为了增加程序的鲁棒性,必要的 catch 一定要有

Finaly:

由于 JedisPool 中 resource 即 SharedJedis 有限,所以一般操作后需要close 或者调用 JedisPool.returnResource() 将连接返回,这样其他 task 可以获取空闲连接。

    try {
      val resource = jedisPool.getResource
      resource.set("k", "v")
    } catch {
      case e: Exception => {
        e.printStackTrace()
      }
    } finally {
      resource.close()
    }

三.hash 函数调用示例

1.hash(String var)

第一个 hash 函数的作用是维护一个 treeMap,k1-Redis1, k2-Redis2,初始化的函数位于 redis.clients.util.ShardInfo 类内,针对每一个 ShardInfo 都会调用 initialize 函数将初始化好的各个 redis 的连接放入 treeMap 中,当用户传入 key 时,调用 hash(byte[] var) 将 key 映射到 K1,K2..,然后通过 K1,K2... 映射到对应的 redis 连接,从而实现 key - K - Redis 的映射,保证存储的分布均匀。

这里调用 hash(String var),this.alog 为默认的 hash 函数或者我们定义的函数,可以看到针对这个 hash 函数,它要 hash 的 key 是固定的,即 SHARD-i-NODE-n,变量就是 n = [0,159] 和 i = JedisPool 绑定的 redis 数量,所以 hash(String var) 的参数 var 基本是固定的,最终要做的就是:

hash(SHARD-i-NODE-n) 得到 k1,k2,k3......,对应 redis r1,r2,......,ri 的 shardInfo,最后将 shardInfo 放到 resoureces 中,resources 本质上是一个 LinkedHashMap。

为了打印日志,我们自定义一个?hash 函数,并使用该 hash 函数初始化 JedisPool:

    val hashFunction: Hashing = new Hashing() {
      println("进入Hash函数!")
      // 决定 hash 到哪台 redis
      override def hash(key: Array[Byte]): Long = {
        println(s"进入Byte Hash! ${new String(key)}")
        (SafeEncoder.encode(key).hashCode & Integer.MAX_VALUE) % RedisNum
      }

      override def hash(key: String): Long = {
        println(s"进入String Hash! key: $key ${key.split("-")(1)}")
        key.split("-")(1).toLong
      }
    }

    val shardedJedisPool = new ShardedJedisPool(config, jedisShardInfoList, hashFunction, Sharded.DEFAULT_KEY_TAG_PATTERN)

运行函数看一下日志:

我采用 key.split("-")(1) 作为 hash 结果,key 的样式是?SHARD-i-NODE-n,i 代表 redis 顺序,n 代表 0-160,所以 split("-")(1) 得到的结果为 i 即 redis 的顺序,因为我绑定了4台 redis,最终到 treeMap 里就只有 4 个 KeyValue 对,Map { 0 -> Redis1, 1-> Redis2,2-> Redis3,3 -> Redis4 }。

2.hash(Byte[] var)

上面通过 hash (String var) 生成了基于 redis id 映射的 map,Map { 0 -> Redis1, 1-> Redis2,2-> Redis3,3 -> Redis4 } 。接下来就需要 hash(byte[] var) 函数将对应的 key 映射到 map 的 keySet 中了,先看下来了一个 key 的请求后 JedisPool 的处理顺序:

A.获取 Resource

    val jedisPool = getSharedJedisPool(hostsAndPorts, false)
    val resource = jedisPool.getResource
    resource.set("test_key", "test_value")
    resource.close()

最简单的就是上面这样,首先 getResource,上面 initialize 函数将每个 redis 的 shardInfo 放置到了 resoucres 中,这样来了 key 就可以通过 hash 函数获取 hash 值然后选择对应的 redis 执行相关操作了

B.Set 操作入口

?所以 set 方法的第一件事情就是根据 key 找到对应的 Jedis

C.寻找 redis 索引

set 方法通过 getShard(key: String) 获取对应 Jedis,getShard 函数再调用 getShardInfo(key: String) 方法,该方法内部再调用 this.algo.hash(key: Byte[]) 方法获取该 key 的索引,然后用过初始化好的 node map 映射到对应 redis 的 shardInfo,再逐级回调,最后返回对应的 Jedis 执行相关的操作。

四.hash 函数使用解释

上面基本解释了两个 Hash 函数的含义,下面再回看一下我们自定义的 hash 函数是如何运作的

1.hash(key: String) 释义

这一步很好理解,key.split("-")(1) 完成 redis 索引到 ShardInfo 的一一映射,不再赘述

2.hash(key: Array[Byte]) 释义

这个写法和 java 不同,java 是 Byte[] ,含义相同。主要看这一行:

        (SafeEncoder.encode(key).hashCode & Integer.MAX_VALUE) % RedisNum

A.SafeEncoder.encode(key).hashCode

SafeEncoder.encode(key).hashCode 该方法针对指定 key 进行 encode 编码并获取一个 long 值的 hashCode,这个是官方 API 内带的方法

B.&?Integer.MAX_VALUE

Interget.MAX_VALUE 的值是?2147483647,其二进制表示为?0111 1111 1111 1111 1111 1111 1111 1111,可以看到第一位是 1,执行 & 操作就是保证 HashCode 最终得到的总是正整数,因为 0 & 0 或者 0 & 1 都是 0,所以保证了?(SafeEncoder.encode(key).hashCode & Integer.MAX_VALUE) 的非负性

C. % RedisNum

这一步保证了这个 hash 函数最终返回的 Long 范围在 redis 索引范围内,配合一一映射的 map,保证每一个 key 都能找到 redis

3.如何快速判断 key 对应的 redis

上面也提到过,判断哪一台 redis 调用 getShard 函数即可,也可以结合同样的 hash 方法获取映射,看索引是否和自己的 redis 绑定顺序符合。

    val redisNum = 4
    val key = "test_key_1"
    val host = resource.getShard("test_key_1").getClient.getHost
    val hashNum = (SafeEncoder.encode("test_key_1".getBytes()).hashCode() & Integer.MAX_VALUE) % redisNum
    println(s"key: $key HashNum: $hashNum host: $host")

五.总结

所以 Hash(Byte[]) 决定了 key 走对应哪个索引 K,Hash(String) 决定这个索引 K 对应哪台 redis,这样两个函数配合就实现了 key -> Redis 的映射,除此之外,使用 JedisPool 一定要注意 return Resource 或者 close !!

  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2022-06-18 23:32:11  更:2022-06-18 23:32:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 1:55:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码