完善了一致性hash的实现,修改了一部分之前实现的数据结构的注释

This commit is contained in:
hlccd 2021-12-27 00:17:58 +08:00 committed by GitHub
parent f9b71dea96
commit 5cdb259ed8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 195 additions and 43 deletions

View File

@ -0,0 +1 @@
package bloomFilter

View File

@ -1,15 +1,26 @@
package consistentHash package consistentHash
//@Title consistentHash
//@Description
// 一致性哈希-consistent hash
// 一致性hash主要用以解决当出现增删结点时需要重新计算hash值的情况
// 同时,利用虚拟结点解决了数据倾斜的问题
// hash范围为2^32,即一个uint32的全部范围
// 本次实现中不允许出现hash值相同的点,出现时则舍弃
// 虚拟结点最多有32个,最少数量可由用户自己决定,但不得低于1个
// 使用互斥锁实现并发控制
import ( import (
"fmt" "fmt"
"sync" "sync"
) )
//最大虚拟节点数量
const ( const (
maxReplicas = 32 maxReplicas = 32
) )
//素数表 //素数表
//用以减少hash冲突
var primes = []uint32{3, 5, 7, 11, 13, 17, 19, 23, 29, 31, var primes = []uint32{3, 5, 7, 11, 13, 17, 19, 23, 29, 31,
37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73,
79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127,
@ -19,6 +30,11 @@ var primes = []uint32{3, 5, 7, 11, 13, 17, 19, 23, 29, 31,
} }
//一致性hash结构体 //一致性hash结构体
//该实例了一致性hash在创建时设定的最小虚拟节点数
//同时保存了所有虚拟节点的hash值
//建立了虚拟节点hash值与实际结点之间的映射表
//每个实际结点也可以映射其所有的虚拟节点的hash值
//并发控制锁用以保证线程安全
type ConsistentHash struct { type ConsistentHash struct {
minReplicas int //最小的虚拟节点数 minReplicas int //最小的虚拟节点数
keys []uint32 //存储的结点和虚拟结点的集合 keys []uint32 //存储的结点和虚拟结点的集合
@ -27,7 +43,25 @@ type ConsistentHash struct {
mutex sync.Mutex //并发控制锁 mutex sync.Mutex //并发控制锁
} }
//hash计算 type consistentHasher interface {
Size() (num int) //返回一致性hash中的虚拟节点数量
Clear() //清空一致性hash的所有结点
Empty() (b bool) //返回该一致性hash是否有结点存在
Insert(keys ...interface{}) (nums []int) //向该一致性hash中插入一组结点,同时返回其生成的虚拟节点的数量
Erase(key interface{}) (ok bool) //删除结点key
Get(key interface{}) (ans interface{}) //查找key对应的结点
}
//@title hash
//@description
// 传入一个虚拟节点id和实际结点
// 计算出它的hash值
// 先利用id从素数表中找到对应的素数,然后将id,素数和实际结点转化为[]byte
// 逐层访问并利用素数计算其hash值随后返回
//@receiver nil
//@param id int 虚拟节点的id
//@param v interface{} 实际结点的key
//@return hm *hashMap 计算得到的hash值
func hash(id int, v interface{}) (h uint32) { func hash(id int, v interface{}) (h uint32) {
prime := primes[(id*id+len(primes))%len(primes)] prime := primes[(id*id+len(primes))%len(primes)]
h = uint32(0) h = uint32(0)
@ -41,14 +75,24 @@ func hash(id int, v interface{}) (h uint32) {
//新建一个一致性hash结构体指针并返回 //新建一个一致性hash结构体指针并返回
//传入其设定的最小的虚拟节点数量 //传入其设定的最小的虚拟节点数量
func New(minReplicas int) *ConsistentHash {
//@title New
//@description
// 新建一个一致性hash结构体指针并返回
// 传入设定的最少虚拟节点数量,不可小于1也不可大于最大虚拟节点数
//@receiver nil
//@param minReplicas int hashMap的hash函数集
//@return ch *ConsistentHash 新建的一致性hash指针
func New(minReplicas int) (ch *ConsistentHash) {
if minReplicas > maxReplicas { if minReplicas > maxReplicas {
//超过最大虚拟节点数
minReplicas = maxReplicas minReplicas = maxReplicas
} }
if minReplicas < 0 { if minReplicas < 1 {
//最少虚拟节点数量不得小于1
minReplicas = 1 minReplicas = 1
} }
ch := &ConsistentHash{ ch = &ConsistentHash{
minReplicas: minReplicas, minReplicas: minReplicas,
keys: make([]uint32, 0, 0), keys: make([]uint32, 0, 0),
hashMap: make(map[uint32]interface{}), hashMap: make(map[uint32]interface{}),
@ -58,7 +102,68 @@ func New(minReplicas int) *ConsistentHash {
return ch return ch
} }
//向一致性hash中插入结点,同时返回每一个结点插入的数量 //@title Size
//@description
// 以一致性hash做接收者
// 返回该容器当前含有的虚拟结点数量
// 如果容器为nil返回0
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param nil
//@return num int 当前含有的虚拟结点数量
func (ch *ConsistentHash) Size() (num int) {
if ch == nil {
return 0
}
return len(ch.keys)
}
//@title Clear
//@description
// 以一致性hash做接收者
// 将该容器中所承载的所有结点清除
// 被清除结点包括实际结点和虚拟节点
// 重建映射表
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param nil
//@return nil
func (ch *ConsistentHash) Clear() {
if ch == nil {
return
}
ch.mutex.Lock()
//重建vector并扩容到16
ch.keys = make([]uint32, 0, 0)
ch.hashMap = make(map[uint32]interface{})
ch.nodeMap = make(map[interface{}][]uint32)
ch.mutex.Unlock()
}
//@title Empty
//@description
// 以一致性hash做接收者
// 判断该一致性hash中是否含有元素
// 如果含有结点则不为空,返回false
// 如果不含有结点则说明为空,返回true
// 如果容器不存在,返回true
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param nil
//@return b bool 该容器是空的吗?
func (ch *ConsistentHash) Empty() (b bool) {
if ch == nil {
return false
}
return len(ch.keys) > 0
}
//@title Insert
//@description
// 以一致性hash做接收者
// 向一致性hash中插入结点,同时返回每一个结点插入的数量
// 插入每个结点后将其对应的虚拟节点的hash放入映射表内和keys内
// 对keys做排序利用二分排序算法
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param keys ...interface{} 待插入的结点的集合
//@return nums []int 插入结点所生成的虚拟节点数量的集合
func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) { func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) {
nums = make([]int, 0, len(keys)) nums = make([]int, 0, len(keys))
ch.mutex.Lock() ch.mutex.Lock()
@ -92,62 +197,57 @@ func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) {
return nums return nums
} }
//二分排序 //@title sort
//@description
// 以一致性hash做接收者
// 二分排序
// 主要用于一致性hash的keys的排序
// 以保证其有序性
// 同时方便后续使用二分查找
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param L int 左下标
//@param R int 右下标
//@return nil
func (ch *ConsistentHash) sort(L, R int) { func (ch *ConsistentHash) sort(L, R int) {
if L >= R { if L >= R {
//左下标大于右下标,结束
return return
} }
//找到中间结点,从左右两侧以双指针形式向中间靠近
l, r, m := L-1, R+1, ch.keys[(L+R)/2] l, r, m := L-1, R+1, ch.keys[(L+R)/2]
for l < r { for l < r {
//左侧出现不小于中间结点时停下
l++ l++
for ch.keys[l] < m { for ch.keys[l] < m {
l++ l++
} }
//右侧出现不大于中间结点时停下
r-- r--
for ch.keys[r] > m { for ch.keys[r] > m {
r-- r--
} }
if l < r { if l < r {
tmp := ch.keys[l] //左节点仍在右结点左方,交换结点的值
ch.keys[l] = ch.keys[r] ch.keys[l], ch.keys[r] = ch.keys[r], ch.keys[l]
ch.keys[r] = tmp
} }
} }
//递归排序左右两侧保证去有序性
ch.sort(L, l-1) ch.sort(L, l-1)
ch.sort(r+1, R) ch.sort(r+1, R)
} }
//从一致性hash中获取以key为索引的下一个结点的索引 //@title Erase
func (ch *ConsistentHash) Get(key interface{}) (ans interface{}) { //@description
if len(ch.keys) == 0 { // 以一致性hash做接收者
return nil // 删除以key为索引的结点
} // 先利用结点映射表判断该key是否存在于一致性hash内
ch.mutex.Lock() // 若存在则从映射表内删除
//计算key的hash值 // 同时找到其所有的虚拟节点的hash值
h := hash(0, key) // 遍历keys进行删除
//从现存的所有虚拟结点中找到该hash值对应的下一个虚拟节点对应的结点的索引 //@receiver ch *ConsistentHash 接受者一致性hash的指针
idx := ch.search(h) //@param key interface{} 待删除元素的key
ans = ch.hashMap[ch.keys[idx]] //@return b bool 删除成功?
ch.mutex.Unlock() func (ch *ConsistentHash) Erase(key interface{}) (ok bool) {
return ans
}
//二分查找,找到最近的不小于该值的hash值,如果不存在则返回0,即进行取模运算即可
func (ch *ConsistentHash) search(h uint32) (idx uint32) {
l, m, r := uint32(0), uint32(len(ch.keys)/2), uint32(len(ch.keys))
for l < r {
m = (l + r) / 2
if ch.keys[m] >= h {
r = m
} else {
l = m + 1
}
}
return l % uint32(len(ch.keys))
}
//删除以key为索引的结点
func (ch *ConsistentHash) Erase(key interface{}) {
ch.mutex.Lock() ch.mutex.Lock()
hs, ok := ch.nodeMap[key] hs, ok := ch.nodeMap[key]
if ok { if ok {
@ -175,4 +275,55 @@ func (ch *ConsistentHash) Erase(key interface{}) {
ch.keys = arr ch.keys = arr
} }
ch.mutex.Unlock() ch.mutex.Unlock()
return ok
}
//@title Get
//@description
// 以一致性hash做接收者
// 从一致性hash中获取以key为索引的下一个结点的索引
// 只要一致性hash内存储了结点则必然可以找到
// 将keys看成一个环
// 要找的key的计算出hash后放入环中
// 按顺时针向下找直到遇到第一个虚拟节点
// 寻找过程利用二分,若二分找到的结点为末尾结点的下一个,即为首个虚拟节点
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param key interface{} 待查找的结点
//@return ans interface{} 找到的对应节点
func (ch *ConsistentHash) Get(key interface{}) (ans interface{}) {
if len(ch.keys) == 0 {
return nil
}
ch.mutex.Lock()
//计算key的hash值
h := hash(0, key)
//从现存的所有虚拟结点中找到该hash值对应的下一个虚拟节点对应的结点的索引
idx := ch.search(h)
ans = ch.hashMap[ch.keys[idx]]
ch.mutex.Unlock()
return ans
}
//@title search
//@description
// 以一致性hash做接收者
// 二分查找
// 找到最近的不小于该值的hash值
// 如果不存在则返回0,即进行取模运算即可
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param h uint32 待查找结点的hash值
//@return idx uint32 找到的对应虚拟节点的下标
func (ch *ConsistentHash) search(h uint32) (idx uint32) {
//二分查找寻找不小于该hash值的下一个值的下标
l, m, r := uint32(0), uint32(len(ch.keys)/2), uint32(len(ch.keys))
for l < r {
m = (l + r) / 2
if ch.keys[m] >= h {
r = m
} else {
l = m + 1
}
}
//当找到的下标等同于keys的长度时即为0
return l % uint32(len(ch.keys))
} }

View File

@ -57,8 +57,8 @@ type hashMaper interface {
// 初始vector长度为16 // 初始vector长度为16
// 若有传入的hash函数,则将传入的第一个hash函数设为该hash映射的hash函数 // 若有传入的hash函数,则将传入的第一个hash函数设为该hash映射的hash函数
//@receiver nil //@receiver nil
//@param Cmp ...algorithm.Hasher hashMap的hash函数集 //@param Cmp ...algorithm.Hasher hashMap的hash函数集
//@return hm *hashMap 新建的hashMap指针 //@return hm *hashMap 新建的hashMap指针
func New(hash ...algorithm.Hasher) (hm *hashMap) { func New(hash ...algorithm.Hasher) (hm *hashMap) {
var h algorithm.Hasher var h algorithm.Hasher
if len(hash) == 0 { if len(hash) == 0 {
@ -125,7 +125,7 @@ func (hm *hashMap) Iterator() (i *Iterator.Iterator) {
// 如果容器为nil返回0 // 如果容器为nil返回0
//@receiver hm *hashMap 接受者hashMap的指针 //@receiver hm *hashMap 接受者hashMap的指针
//@param nil //@param nil
//@return num int 容器中实际使用元素所占空间大小 //@return num uint64 当前存储的元素数量
func (hm *hashMap) Size() (num uint64) { func (hm *hashMap) Size() (num uint64) {
if hm == nil { if hm == nil {
return 0 return 0