goSTL/data_structure/consistentHash/consistentHash.go

330 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package consistentHash
//@Title consistentHash
//@Description
// 一致性哈希-consistent hash
// 一致性hash主要用以解决当出现增删结点时需要重新计算hash值的情况
// 同时,利用虚拟结点解决了数据倾斜的问题
// hash范围为2^32,即一个uint32的全部范围
// 本次实现中不允许出现hash值相同的点,出现时则舍弃
// 虚拟结点最多有32个,最少数量可由用户自己决定,但不得低于1个
// 使用互斥锁实现并发控制
import (
"fmt"
"sync"
)
//最大虚拟节点数量
const (
maxReplicas = 32
)
//素数表
//用以减少hash冲突
var primes = []uint32{3, 5, 7, 11, 13, 17, 19, 23, 29, 31,
37, 41, 43, 47, 53, 59, 61, 67, 71, 73,
79, 83, 89, 97, 101, 103, 107, 109, 113, 127,
131, 137, 139, 149, 151, 157, 163, 167, 173, 179,
121, 191, 193, 197, 199, 211, 223, 227, 229, 233,
239, 241, 251,
}
//一致性hash结构体
//该实例了一致性hash在创建时设定的最小虚拟节点数
//同时保存了所有虚拟节点的hash值
//建立了虚拟节点hash值与实际结点之间的映射表
//每个实际结点也可以映射其所有的虚拟节点的hash值
//并发控制锁用以保证线程安全
type ConsistentHash struct {
minReplicas int //最小的虚拟节点数
keys []uint32 //存储的结点和虚拟结点的集合
hashMap map[uint32]interface{} //hash与结点之间的映射
nodeMap map[interface{}][]uint32 //结点所对应的虚拟节点的hash值
mutex sync.Mutex //并发控制锁
}
type consistentHasher interface {
Size() (num int) //返回一致性hash中的虚拟节点数量
Clear() //清空一致性hash的所有结点
Empty() (b bool) //返回该一致性hash是否有结点存在
Insert(keys ...interface{}) (nums []int) //向该一致性hash中插入一组结点,同时返回其生成的虚拟节点的数量
Erase(key interface{}) (ok bool) //删除结点key
Get(key interface{}) (ans interface{}) //查找key对应的结点
}
//@title hash
//@description
// 传入一个虚拟节点id和实际结点
// 计算出它的hash值
// 先利用id从素数表中找到对应的素数,然后将id,素数和实际结点转化为[]byte
// 逐层访问并利用素数计算其hash值随后返回
//@receiver nil
//@param id int 虚拟节点的id
//@param v interface{} 实际结点的key
//@return h uint32 计算得到的hash值
func hash(id int, v interface{}) (h uint32) {
prime := primes[(id*id+len(primes))%len(primes)]
h = uint32(0)
s := fmt.Sprintf("%d-%v-%d", id*int(prime), v, prime)
bs := []byte(s)
for i := range bs {
h += uint32(bs[i]) * prime
}
return h
}
//新建一个一致性hash结构体指针并返回
//传入其设定的最小的虚拟节点数量
//@title New
//@description
// 新建一个一致性hash结构体指针并返回
// 传入设定的最少虚拟节点数量,不可小于1也不可大于最大虚拟节点数
//@receiver nil
//@param minReplicas int hashMap的hash函数集
//@return ch *ConsistentHash 新建的一致性hash指针
func New(minReplicas int) (ch *ConsistentHash) {
if minReplicas > maxReplicas {
//超过最大虚拟节点数
minReplicas = maxReplicas
}
if minReplicas < 1 {
//最少虚拟节点数量不得小于1
minReplicas = 1
}
ch = &ConsistentHash{
minReplicas: minReplicas,
keys: make([]uint32, 0, 0),
hashMap: make(map[uint32]interface{}),
nodeMap: make(map[interface{}][]uint32),
mutex: sync.Mutex{},
}
return ch
}
//@title Size
//@description
// 以一致性hash做接收者
// 返回该容器当前含有的虚拟结点数量
// 如果容器为nil返回0
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param nil
//@return num int 当前含有的虚拟结点数量
func (ch *ConsistentHash) Size() (num int) {
if ch == nil {
return 0
}
return len(ch.keys)
}
//@title Clear
//@description
// 以一致性hash做接收者
// 将该容器中所承载的所有结点清除
// 被清除结点包括实际结点和虚拟节点
// 重建映射表
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param nil
//@return nil
func (ch *ConsistentHash) Clear() {
if ch == nil {
return
}
ch.mutex.Lock()
//重建vector并扩容到16
ch.keys = make([]uint32, 0, 0)
ch.hashMap = make(map[uint32]interface{})
ch.nodeMap = make(map[interface{}][]uint32)
ch.mutex.Unlock()
}
//@title Empty
//@description
// 以一致性hash做接收者
// 判断该一致性hash中是否含有元素
// 如果含有结点则不为空,返回false
// 如果不含有结点则说明为空,返回true
// 如果容器不存在,返回true
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param nil
//@return b bool 该容器是空的吗?
func (ch *ConsistentHash) Empty() (b bool) {
if ch == nil {
return false
}
return len(ch.keys) > 0
}
//@title Insert
//@description
// 以一致性hash做接收者
// 向一致性hash中插入结点,同时返回每一个结点插入的数量
// 插入每个结点后将其对应的虚拟节点的hash放入映射表内和keys内
// 对keys做排序利用二分排序算法
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param keys ...interface{} 待插入的结点的集合
//@return nums []int 插入结点所生成的虚拟节点数量的集合
func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) {
nums = make([]int, 0, len(keys))
ch.mutex.Lock()
//遍历所有待插入的结点
for _, key := range keys {
num := 0
//判断结点是否已经存在
_, exist := ch.nodeMap[key]
if !exist {
//结点不存在,开始插入
for i := 0; i < maxReplicas || num < ch.minReplicas; i++ {
//生成每个虚拟节点的hash值
h := uint32(hash(i, key))
//判断生成的hash值是否存在,存在则不插入
_, ok := ch.hashMap[h]
if !ok {
//hash值不存在,进行插入
num++
ch.keys = append(ch.keys, h)
//同时建立hash值和结点之间的映射关系
ch.hashMap[h] = key
ch.nodeMap[key] = append(ch.nodeMap[key], h)
}
}
}
nums = append(nums, num)
}
//对keys进行排序,以方便后续查找
ch.sort(0, len(ch.keys)-1)
ch.mutex.Unlock()
return nums
}
//@title sort
//@description
// 以一致性hash做接收者
// 二分排序
// 主要用于一致性hash的keys的排序
// 以保证其有序性
// 同时方便后续使用二分查找
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param L int 左下标
//@param R int 右下标
//@return nil
func (ch *ConsistentHash) sort(L, R int) {
if L >= R {
//左下标大于右下标,结束
return
}
//找到中间结点,从左右两侧以双指针形式向中间靠近
l, r, m := L-1, R+1, ch.keys[(L+R)/2]
for l < r {
//左侧出现不小于中间结点时停下
l++
for ch.keys[l] < m {
l++
}
//右侧出现不大于中间结点时停下
r--
for ch.keys[r] > m {
r--
}
if l < r {
//左节点仍在右结点左方,交换结点的值
ch.keys[l], ch.keys[r] = ch.keys[r], ch.keys[l]
}
}
//递归排序左右两侧保证去有序性
ch.sort(L, l-1)
ch.sort(r+1, R)
}
//@title Erase
//@description
// 以一致性hash做接收者
// 删除以key为索引的结点
// 先利用结点映射表判断该key是否存在于一致性hash内
// 若存在则从映射表内删除
// 同时找到其所有的虚拟节点的hash值
// 遍历keys进行删除
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param key interface{} 待删除元素的key
//@return b bool 删除成功?
func (ch *ConsistentHash) Erase(key interface{}) (ok bool) {
ch.mutex.Lock()
hs, ok := ch.nodeMap[key]
if ok {
//该结点存在于一致性hash内
//删除该结点
delete(ch.nodeMap, key)
//删除所有该结点的虚拟节点与该结点的映射关系
for i := range hs {
delete(ch.hashMap, hs[i])
}
//将待删除的虚拟结点删除即可
arr := make([]uint32, 0, len(ch.keys)-len(hs))
for i := range ch.keys {
h := ch.keys[i]
flag := true
for j := range hs {
if hs[j] == h {
flag = false
}
}
if flag {
arr = append(arr, h)
}
}
ch.keys = arr
}
ch.mutex.Unlock()
return ok
}
//@title Get
//@description
// 以一致性hash做接收者
// 从一致性hash中获取以key为索引的下一个结点的索引
// 只要一致性hash内存储了结点则必然可以找到
// 将keys看成一个环
// 要找的key的计算出hash后放入环中
// 按顺时针向下找直到遇到第一个虚拟节点
// 寻找过程利用二分,若二分找到的结点为末尾结点的下一个,即为首个虚拟节点
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param key interface{} 待查找的结点
//@return ans interface{} 找到的对应节点
func (ch *ConsistentHash) Get(key interface{}) (ans interface{}) {
if len(ch.keys) == 0 {
return nil
}
ch.mutex.Lock()
//计算key的hash值
h := hash(0, key)
//从现存的所有虚拟结点中找到该hash值对应的下一个虚拟节点对应的结点的索引
idx := ch.search(h)
ans = ch.hashMap[ch.keys[idx]]
ch.mutex.Unlock()
return ans
}
//@title search
//@description
// 以一致性hash做接收者
// 二分查找
// 找到最近的不小于该值的hash值
// 如果不存在则返回0,即进行取模运算即可
//@receiver ch *ConsistentHash 接受者一致性hash的指针
//@param h uint32 待查找结点的hash值
//@return idx uint32 找到的对应虚拟节点的下标
func (ch *ConsistentHash) search(h uint32) (idx uint32) {
//二分查找寻找不小于该hash值的下一个值的下标
l, m, r := uint32(0), uint32(len(ch.keys)/2), uint32(len(ch.keys))
for l < r {
m = (l + r) / 2
if ch.keys[m] >= h {
r = m
} else {
l = m + 1
}
}
//当找到的下标等同于keys的长度时即为0
return l % uint32(len(ch.keys))
}