diff --git a/goSTL/algorithm/singleFlight/singleFlight.go b/goSTL/algorithm/singleFlight/singleFlight.go new file mode 100644 index 0000000..7a7315d --- /dev/null +++ b/goSTL/algorithm/singleFlight/singleFlight.go @@ -0,0 +1,69 @@ +package singleFlight + +import "sync" + +//呼叫请求结构体 +type call struct { + wg sync.WaitGroup //可重入锁 + val interface{} //请求结果 + err error //错误反馈 +} + +type Group struct { + mu sync.Mutex // protects m + m map[string]*call//所有请求 +} + +//防止击穿缓存,对同一个key进行请求时需要分别进行,利用可重入锁实现 +func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + g.mu.Unlock() + c.wg.Wait() // 如果请求正在进行中,则等待 + return c.val, c.err // 请求结束,返回结果 + } + c := new(call) + c.wg.Add(1) // 发起请求前加锁 + g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理 + g.mu.Unlock() + + c.val, c.err = fn() // 调用 fn,发起请求 + c.wg.Done() // 请求结束 + + g.mu.Lock() + delete(g.m, key) // 更新 g.m + g.mu.Unlock() + + return c.val, c.err // 返回结果 +} + +func (g *Group) DoChan(key string, fn func() (interface{}, error)) (ch chan interface{}) { + ch = make(chan interface{}) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + g.mu.Unlock() + c.wg.Wait() // 如果请求正在进行中,则等待 + ch <- c.val + return ch + } + c := new(call) + c.wg.Add(1) // 发起请求前加锁 + g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理 + g.mu.Unlock() + + c.val, c.err = fn() // 调用 fn,发起请求 + c.wg.Done() // 请求结束 + ch <- c.val + + g.mu.Lock() + delete(g.m, key) // 更新 g.m + g.mu.Unlock() + + return ch +} diff --git a/goSTL/data_structure/consistentHash/consistentHash.go b/goSTL/data_structure/consistentHash/consistentHash.go index 7323e90..920e80b 100644 --- a/goSTL/data_structure/consistentHash/consistentHash.go +++ b/goSTL/data_structure/consistentHash/consistentHash.go @@ -9,7 +9,8 @@ const ( maxReplicas = 32 ) -var primes = []byte{3, 5, 7, 11, 13, 17, 19, 23, 29, 31, +//素数表 +var primes = []uint32{3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, @@ -17,24 +18,29 @@ var primes = []byte{3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 239, 241, 251, } +//一致性hash结构体 type ConsistentHash struct { - minReplicas int + minReplicas int //最小的虚拟节点数 keys []uint32 //存储的结点和虚拟结点的集合 hashMap map[uint32]interface{} //hash与结点之间的映射 nodeMap map[interface{}][]uint32 //结点所对应的虚拟节点的hash值 - mutex sync.Mutex + mutex sync.Mutex //并发控制锁 } +//hash计算 func hash(id int, v interface{}) (h uint32) { prime := primes[(id*id+len(primes))%len(primes)] h = uint32(0) s := fmt.Sprintf("%d-%v-%d", id*int(prime), v, prime) bs := []byte(s) for i := range bs { - h += uint32(bs[i] * prime) + h += uint32(bs[i]) * prime } return h } + +//新建一个一致性hash结构体指针并返回 +//传入其设定的最小的虚拟节点数量 func New(minReplicas int) *ConsistentHash { if minReplicas > maxReplicas { minReplicas = maxReplicas @@ -51,19 +57,28 @@ func New(minReplicas int) *ConsistentHash { } return ch } + +//向一致性hash中插入结点,同时返回每一个结点插入的数量 func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) { nums = make([]int, 0, len(keys)) ch.mutex.Lock() + //遍历所有待插入的结点 for _, key := range keys { num := 0 + //判断结点是否已经存在 _, exist := ch.nodeMap[key] if !exist { + //结点不存在,开始插入 for i := 0; i < maxReplicas || num < ch.minReplicas; i++ { + //生成每个虚拟节点的hash值 h := uint32(hash(i, key)) + //判断生成的hash值是否存在,存在则不插入 _, ok := ch.hashMap[h] if !ok { + //hash值不存在,进行插入 num++ ch.keys = append(ch.keys, h) + //同时建立hash值和结点之间的映射关系 ch.hashMap[h] = key ch.nodeMap[key] = append(ch.nodeMap[key], h) } @@ -76,6 +91,8 @@ func (ch *ConsistentHash) Insert(keys ...interface{}) (nums []int) { ch.mutex.Unlock() return nums } + +//二分排序 func (ch *ConsistentHash) sort(L, R int) { if L >= R { return @@ -99,17 +116,23 @@ func (ch *ConsistentHash) sort(L, R int) { ch.sort(L, l-1) ch.sort(r+1, R) } + +//从一致性hash中获取以key为索引的下一个结点的索引 func (ch *ConsistentHash) Get(key interface{}) (ans interface{}) { if len(ch.keys) == 0 { return nil } ch.mutex.Lock() + //计算key的hash值 h := hash(0, key) + //从现存的所有虚拟结点中找到该hash值对应的下一个虚拟节点对应的结点的索引 idx := ch.search(h) ans = ch.hashMap[ch.keys[idx]] ch.mutex.Unlock() return ans } + +//二分查找,找到最近的不小于该值的hash值,如果不存在则返回0,即进行取模运算即可 func (ch *ConsistentHash) search(h uint32) (idx uint32) { l, m, r := uint32(0), uint32(len(ch.keys)/2), uint32(len(ch.keys)) for l < r { @@ -122,14 +145,20 @@ func (ch *ConsistentHash) search(h uint32) (idx uint32) { } return l % uint32(len(ch.keys)) } + +//删除以key为索引的结点 func (ch *ConsistentHash) Erase(key interface{}) { ch.mutex.Lock() hs, ok := ch.nodeMap[key] if ok { + //该结点存在于一致性hash内 + //删除该结点 delete(ch.nodeMap, key) + //删除所有该结点的虚拟节点与该结点的映射关系 for i := range hs { delete(ch.hashMap, hs[i]) } + //将待删除的虚拟结点删除即可 arr := make([]uint32, 0, len(ch.keys)-len(hs)) for i := range ch.keys { h := ch.keys[i] diff --git a/goSTL/data_structure/lru/lru.go b/goSTL/data_structure/lru/lru.go index dad2e2f..66b36ad 100644 --- a/goSTL/data_structure/lru/lru.go +++ b/goSTL/data_structure/lru/lru.go @@ -1,25 +1,34 @@ package lru +//最近最少使用 +//相对于仅考虑时间因素的FIFO和仅考虑访问频率的LFU,LRU算法可以认为是相对平衡的一种淘汰算法 +//LRU认为,如果数据最近被访问过,那么将来被访问的概率也会更高 +//LRU 算法的实现非常简单,维护一个队列 +//如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。 import ( "container/list" "sync" ) +//LRU链结构体 type LRU struct { - maxBytes int64 - nowBytes int64 - ll *list.List - cache map[string]*list.Element - onRemove func(key string, value Value) - mutex sync.Mutex //并发控制锁 -} -type indexes struct { - key string - value Value + maxBytes int64 //所能承载的最大byte数量 + nowBytes int64 //当前承载的byte数 + ll *list.List //用于存储的链表 + cache map[string]*list.Element //链表元素与key的映射表 + onRemove func(key string, value Value) //删除元素时的执行函数 + mutex sync.Mutex //并发控制锁 } +//索引结构体, +type indexes struct { + key string //索引 + value Value //存储值 +} + +//存储值的函数 type Value interface { - Len() int + Len() int//用于计算该存储值所占用的长度 } func New(maxBytes int64, onRemove func(string, Value)) *LRU { @@ -29,9 +38,14 @@ func New(maxBytes int64, onRemove func(string, Value)) *LRU { ll: list.New(), cache: make(map[string]*list.Element), onRemove: onRemove, + mutex: sync.Mutex{}, } } -func (l *LRU) Add(key string, value Value) { +//向该LRU中插入以key为索引的value +//若已经存在则将其放到队尾 +//若空间充足且不存在则直接插入队尾 +//若空间不足则淘汰队首元素再将其插入队尾 +func (l *LRU) Insert(key string, value Value) { l.mutex.Lock() if ele, ok := l.cache[key]; ok { l.ll.MoveToFront(ele) @@ -62,7 +76,9 @@ func (l *LRU) Add(key string, value Value) { } l.mutex.Unlock() } -func (l *LRU) RemoveOldest() { +//删除队尾元素 +//删除后执行创建时传入的删除执行函数 +func (l *LRU) Erase() { l.mutex.Lock() ele := l.ll.Back() if ele != nil { @@ -78,6 +94,8 @@ func (l *LRU) RemoveOldest() { } l.mutex.Unlock() } +//从LRU链中寻找以key为索引的value +//找到则返回,否则返回nil和false func (l *LRU) Get(key string) (value Value, ok bool) { l.mutex.Lock() if ele, ok := l.cache[key]; ok {