mirror of
https://github.com/hlccd/goSTL.git
synced 2025-01-18 22:09:33 +08:00
新增了singleFlight的算法实现,可以避免缓存击穿,同时完善了之前一些数据结构的注释
This commit is contained in:
parent
0dc9a14751
commit
69e0123006
69
goSTL/algorithm/singleFlight/singleFlight.go
Normal file
69
goSTL/algorithm/singleFlight/singleFlight.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package singleFlight
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
//呼叫请求结构体
|
||||||
|
type call struct {
|
||||||
|
wg sync.WaitGroup //可重入锁
|
||||||
|
val interface{} //请求结果
|
||||||
|
err error //错误反馈
|
||||||
|
}
|
||||||
|
|
||||||
|
type Group struct {
|
||||||
|
mu sync.Mutex // protects m
|
||||||
|
m map[string]*call//所有请求
|
||||||
|
}
|
||||||
|
|
||||||
|
//防止击穿缓存,对同一个key进行请求时需要分别进行,利用可重入锁实现
|
||||||
|
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
|
||||||
|
g.mu.Lock()
|
||||||
|
if g.m == nil {
|
||||||
|
g.m = make(map[string]*call)
|
||||||
|
}
|
||||||
|
if c, ok := g.m[key]; ok {
|
||||||
|
g.mu.Unlock()
|
||||||
|
c.wg.Wait() // 如果请求正在进行中,则等待
|
||||||
|
return c.val, c.err // 请求结束,返回结果
|
||||||
|
}
|
||||||
|
c := new(call)
|
||||||
|
c.wg.Add(1) // 发起请求前加锁
|
||||||
|
g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理
|
||||||
|
g.mu.Unlock()
|
||||||
|
|
||||||
|
c.val, c.err = fn() // 调用 fn,发起请求
|
||||||
|
c.wg.Done() // 请求结束
|
||||||
|
|
||||||
|
g.mu.Lock()
|
||||||
|
delete(g.m, key) // 更新 g.m
|
||||||
|
g.mu.Unlock()
|
||||||
|
|
||||||
|
return c.val, c.err // 返回结果
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Group) DoChan(key string, fn func() (interface{}, error)) (ch chan interface{}) {
|
||||||
|
ch = make(chan interface{})
|
||||||
|
g.mu.Lock()
|
||||||
|
if g.m == nil {
|
||||||
|
g.m = make(map[string]*call)
|
||||||
|
}
|
||||||
|
if c, ok := g.m[key]; ok {
|
||||||
|
g.mu.Unlock()
|
||||||
|
c.wg.Wait() // 如果请求正在进行中,则等待
|
||||||
|
ch <- c.val
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
c := new(call)
|
||||||
|
c.wg.Add(1) // 发起请求前加锁
|
||||||
|
g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理
|
||||||
|
g.mu.Unlock()
|
||||||
|
|
||||||
|
c.val, c.err = fn() // 调用 fn,发起请求
|
||||||
|
c.wg.Done() // 请求结束
|
||||||
|
ch <- c.val
|
||||||
|
|
||||||
|
g.mu.Lock()
|
||||||
|
delete(g.m, key) // 更新 g.m
|
||||||
|
g.mu.Unlock()
|
||||||
|
|
||||||
|
return ch
|
||||||
|
}
|
@ -9,7 +9,8 @@ const (
|
|||||||
maxReplicas = 32
|
maxReplicas = 32
|
||||||
)
|
)
|
||||||
|
|
||||||
var primes = []byte{3, 5, 7, 11, 13, 17, 19, 23, 29, 31,
|
//素数表
|
||||||
|
var primes = []uint32{3, 5, 7, 11, 13, 17, 19, 23, 29, 31,
|
||||||
37, 41, 43, 47, 53, 59, 61, 67, 71, 73,
|
37, 41, 43, 47, 53, 59, 61, 67, 71, 73,
|
||||||
79, 83, 89, 97, 101, 103, 107, 109, 113, 127,
|
79, 83, 89, 97, 101, 103, 107, 109, 113, 127,
|
||||||
131, 137, 139, 149, 151, 157, 163, 167, 173, 179,
|
131, 137, 139, 149, 151, 157, 163, 167, 173, 179,
|
||||||
@ -17,24 +18,29 @@ var primes = []byte{3, 5, 7, 11, 13, 17, 19, 23, 29, 31,
|
|||||||
239, 241, 251,
|
239, 241, 251,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//一致性hash结构体
|
||||||
type ConsistentHash struct {
|
type ConsistentHash struct {
|
||||||
minReplicas int
|
minReplicas int //最小的虚拟节点数
|
||||||
keys []uint32 //存储的结点和虚拟结点的集合
|
keys []uint32 //存储的结点和虚拟结点的集合
|
||||||
hashMap map[uint32]interface{} //hash与结点之间的映射
|
hashMap map[uint32]interface{} //hash与结点之间的映射
|
||||||
nodeMap map[interface{}][]uint32 //结点所对应的虚拟节点的hash值
|
nodeMap map[interface{}][]uint32 //结点所对应的虚拟节点的hash值
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex //并发控制锁
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//hash计算
|
||||||
func hash(id int, v interface{}) (h uint32) {
|
func hash(id int, v interface{}) (h uint32) {
|
||||||
prime := primes[(id*id+len(primes))%len(primes)]
|
prime := primes[(id*id+len(primes))%len(primes)]
|
||||||
h = uint32(0)
|
h = uint32(0)
|
||||||
s := fmt.Sprintf("%d-%v-%d", id*int(prime), v, prime)
|
s := fmt.Sprintf("%d-%v-%d", id*int(prime), v, prime)
|
||||||
bs := []byte(s)
|
bs := []byte(s)
|
||||||
for i := range bs {
|
for i := range bs {
|
||||||
h += uint32(bs[i] * prime)
|
h += uint32(bs[i]) * prime
|
||||||
}
|
}
|
||||||
return h
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//新建一个一致性hash结构体指针并返回
|
||||||
|
//传入其设定的最小的虚拟节点数量
|
||||||
func New(minReplicas int) *ConsistentHash {
|
func New(minReplicas int) *ConsistentHash {
|
||||||
if minReplicas > maxReplicas {
|
if minReplicas > maxReplicas {
|
||||||
minReplicas = maxReplicas
|
minReplicas = maxReplicas
|
||||||
@ -51,19 +57,28 @@ func New(minReplicas int) *ConsistentHash {
|
|||||||
}
|
}
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//向一致性hash中插入结点,同时返回每一个结点插入的数量
|
||||||
func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) {
|
func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) {
|
||||||
nums = make([]int, 0, len(keys))
|
nums = make([]int, 0, len(keys))
|
||||||
ch.mutex.Lock()
|
ch.mutex.Lock()
|
||||||
|
//遍历所有待插入的结点
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
num := 0
|
num := 0
|
||||||
|
//判断结点是否已经存在
|
||||||
_, exist := ch.nodeMap[key]
|
_, exist := ch.nodeMap[key]
|
||||||
if !exist {
|
if !exist {
|
||||||
|
//结点不存在,开始插入
|
||||||
for i := 0; i < maxReplicas || num < ch.minReplicas; i++ {
|
for i := 0; i < maxReplicas || num < ch.minReplicas; i++ {
|
||||||
|
//生成每个虚拟节点的hash值
|
||||||
h := uint32(hash(i, key))
|
h := uint32(hash(i, key))
|
||||||
|
//判断生成的hash值是否存在,存在则不插入
|
||||||
_, ok := ch.hashMap[h]
|
_, ok := ch.hashMap[h]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
//hash值不存在,进行插入
|
||||||
num++
|
num++
|
||||||
ch.keys = append(ch.keys, h)
|
ch.keys = append(ch.keys, h)
|
||||||
|
//同时建立hash值和结点之间的映射关系
|
||||||
ch.hashMap[h] = key
|
ch.hashMap[h] = key
|
||||||
ch.nodeMap[key] = append(ch.nodeMap[key], h)
|
ch.nodeMap[key] = append(ch.nodeMap[key], h)
|
||||||
}
|
}
|
||||||
@ -76,6 +91,8 @@ func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) {
|
|||||||
ch.mutex.Unlock()
|
ch.mutex.Unlock()
|
||||||
return nums
|
return nums
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//二分排序
|
||||||
func (ch *ConsistentHash) sort(L, R int) {
|
func (ch *ConsistentHash) sort(L, R int) {
|
||||||
if L >= R {
|
if L >= R {
|
||||||
return
|
return
|
||||||
@ -99,17 +116,23 @@ func (ch *ConsistentHash) sort(L, R int) {
|
|||||||
ch.sort(L, l-1)
|
ch.sort(L, l-1)
|
||||||
ch.sort(r+1, R)
|
ch.sort(r+1, R)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//从一致性hash中获取以key为索引的下一个结点的索引
|
||||||
func (ch *ConsistentHash) Get(key interface{}) (ans interface{}) {
|
func (ch *ConsistentHash) Get(key interface{}) (ans interface{}) {
|
||||||
if len(ch.keys) == 0 {
|
if len(ch.keys) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ch.mutex.Lock()
|
ch.mutex.Lock()
|
||||||
|
//计算key的hash值
|
||||||
h := hash(0, key)
|
h := hash(0, key)
|
||||||
|
//从现存的所有虚拟结点中找到该hash值对应的下一个虚拟节点对应的结点的索引
|
||||||
idx := ch.search(h)
|
idx := ch.search(h)
|
||||||
ans = ch.hashMap[ch.keys[idx]]
|
ans = ch.hashMap[ch.keys[idx]]
|
||||||
ch.mutex.Unlock()
|
ch.mutex.Unlock()
|
||||||
return ans
|
return ans
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//二分查找,找到最近的不小于该值的hash值,如果不存在则返回0,即进行取模运算即可
|
||||||
func (ch *ConsistentHash) search(h uint32) (idx uint32) {
|
func (ch *ConsistentHash) search(h uint32) (idx uint32) {
|
||||||
l, m, r := uint32(0), uint32(len(ch.keys)/2), uint32(len(ch.keys))
|
l, m, r := uint32(0), uint32(len(ch.keys)/2), uint32(len(ch.keys))
|
||||||
for l < r {
|
for l < r {
|
||||||
@ -122,14 +145,20 @@ func (ch *ConsistentHash) search(h uint32) (idx uint32) {
|
|||||||
}
|
}
|
||||||
return l % uint32(len(ch.keys))
|
return l % uint32(len(ch.keys))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//删除以key为索引的结点
|
||||||
func (ch *ConsistentHash) Erase(key interface{}) {
|
func (ch *ConsistentHash) Erase(key interface{}) {
|
||||||
ch.mutex.Lock()
|
ch.mutex.Lock()
|
||||||
hs, ok := ch.nodeMap[key]
|
hs, ok := ch.nodeMap[key]
|
||||||
if ok {
|
if ok {
|
||||||
|
//该结点存在于一致性hash内
|
||||||
|
//删除该结点
|
||||||
delete(ch.nodeMap, key)
|
delete(ch.nodeMap, key)
|
||||||
|
//删除所有该结点的虚拟节点与该结点的映射关系
|
||||||
for i := range hs {
|
for i := range hs {
|
||||||
delete(ch.hashMap, hs[i])
|
delete(ch.hashMap, hs[i])
|
||||||
}
|
}
|
||||||
|
//将待删除的虚拟结点删除即可
|
||||||
arr := make([]uint32, 0, len(ch.keys)-len(hs))
|
arr := make([]uint32, 0, len(ch.keys)-len(hs))
|
||||||
for i := range ch.keys {
|
for i := range ch.keys {
|
||||||
h := ch.keys[i]
|
h := ch.keys[i]
|
||||||
|
@ -1,25 +1,34 @@
|
|||||||
package lru
|
package lru
|
||||||
|
|
||||||
|
//最近最少使用
|
||||||
|
//相对于仅考虑时间因素的FIFO和仅考虑访问频率的LFU,LRU算法可以认为是相对平衡的一种淘汰算法
|
||||||
|
//LRU认为,如果数据最近被访问过,那么将来被访问的概率也会更高
|
||||||
|
//LRU 算法的实现非常简单,维护一个队列
|
||||||
|
//如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//LRU链结构体
|
||||||
type LRU struct {
|
type LRU struct {
|
||||||
maxBytes int64
|
maxBytes int64 //所能承载的最大byte数量
|
||||||
nowBytes int64
|
nowBytes int64 //当前承载的byte数
|
||||||
ll *list.List
|
ll *list.List //用于存储的链表
|
||||||
cache map[string]*list.Element
|
cache map[string]*list.Element //链表元素与key的映射表
|
||||||
onRemove func(key string, value Value)
|
onRemove func(key string, value Value) //删除元素时的执行函数
|
||||||
mutex sync.Mutex //并发控制锁
|
mutex sync.Mutex //并发控制锁
|
||||||
}
|
|
||||||
type indexes struct {
|
|
||||||
key string
|
|
||||||
value Value
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//索引结构体,
|
||||||
|
type indexes struct {
|
||||||
|
key string //索引
|
||||||
|
value Value //存储值
|
||||||
|
}
|
||||||
|
|
||||||
|
//存储值的函数
|
||||||
type Value interface {
|
type Value interface {
|
||||||
Len() int
|
Len() int//用于计算该存储值所占用的长度
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(maxBytes int64, onRemove func(string, Value)) *LRU {
|
func New(maxBytes int64, onRemove func(string, Value)) *LRU {
|
||||||
@ -29,9 +38,14 @@ func New(maxBytes int64, onRemove func(string, Value)) *LRU {
|
|||||||
ll: list.New(),
|
ll: list.New(),
|
||||||
cache: make(map[string]*list.Element),
|
cache: make(map[string]*list.Element),
|
||||||
onRemove: onRemove,
|
onRemove: onRemove,
|
||||||
|
mutex: sync.Mutex{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (l *LRU) Add(key string, value Value) {
|
//向该LRU中插入以key为索引的value
|
||||||
|
//若已经存在则将其放到队尾
|
||||||
|
//若空间充足且不存在则直接插入队尾
|
||||||
|
//若空间不足则淘汰队首元素再将其插入队尾
|
||||||
|
func (l *LRU) Insert(key string, value Value) {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
if ele, ok := l.cache[key]; ok {
|
if ele, ok := l.cache[key]; ok {
|
||||||
l.ll.MoveToFront(ele)
|
l.ll.MoveToFront(ele)
|
||||||
@ -62,7 +76,9 @@ func (l *LRU) Add(key string, value Value) {
|
|||||||
}
|
}
|
||||||
l.mutex.Unlock()
|
l.mutex.Unlock()
|
||||||
}
|
}
|
||||||
func (l *LRU) RemoveOldest() {
|
//删除队尾元素
|
||||||
|
//删除后执行创建时传入的删除执行函数
|
||||||
|
func (l *LRU) Erase() {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
ele := l.ll.Back()
|
ele := l.ll.Back()
|
||||||
if ele != nil {
|
if ele != nil {
|
||||||
@ -78,6 +94,8 @@ func (l *LRU) RemoveOldest() {
|
|||||||
}
|
}
|
||||||
l.mutex.Unlock()
|
l.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
//从LRU链中寻找以key为索引的value
|
||||||
|
//找到则返回,否则返回nil和false
|
||||||
func (l *LRU) Get(key string) (value Value, ok bool) {
|
func (l *LRU) Get(key string) (value Value, ok bool) {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
if ele, ok := l.cache[key]; ok {
|
if ele, ok := l.cache[key]; ok {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user