Simple usage
gropucache的官方网站是 https://github.com/golang/groupcache
consistenthash模块
一致性hash算法,通常是用在查找一个合适的下载节点时,使负载更平均,但是对于同样的请求始终返回一样的结果。
- type Map struct {
- hash Hash
- replicas int
- keys []int // Sorted
- hashMap map[int]string
- }
- Map结构中replicas的含义是增加虚拟桶,使数据分布更加均匀。
- // 创建Map结构
- func New(replicas int,fn Hash) *Map
- // 添加新的Key
- func (m *Map) Add(keys ...string)
- // 根据hash(key)获取value
- func (m *Map) Get(key string) string
- // 判断Map是否为空
- func (m *Map) IsEmpty() bool
- // 竟然没有提供Remove方法 O_O
用法简单例子
- package main
- import (
- "fmt"
- "github.com/golang/groupcache/consistenthash"
- )
- func main() {
- c := consistenthash.New(70,nil)
- c.Add("A","B","C","D","E")
- for _,key := range []string{"what","nice","what","good","yes!"} {
- fmt.Printf("%s -> %s\n",key,c.Get(key))
- }
- }
- // Expect output
- // -------------
- // what -> C
- // nice -> A
- // what -> C
- // nice -> A
- // good -> D
- // yes! -> E
singleflight 模块
使用例子
- package main
- import (
- "fmt"
- "sync"
- "time"
- "github.com/golang/groupcache/singleflight"
- )
- func NewDelayReturn(dur time.Duration,n int) func() (interface{},error) {
- return func() (interface{},error) {
- time.Sleep(dur)
- return n,nil
- }
- }
- func main() {
- g := singleflight.Group{}
- wg := sync.WaitGroup{}
- wg.Add(2)
- go func() {
- ret,err := g.Do("key",NewDelayReturn(time.Second*1,1))
- if err != nil {
- panic(err)
- }
- fmt.Printf("key-1 get %v\n",ret)
- wg.Done()
- }()
- go func() {
- time.Sleep(100 * time.Millisecond) // make sure this is call is later
- ret,NewDelayReturn(time.Second*2,2))
- if err != nil {
- panic(err)
- }
- fmt.Printf("key-2 get %v\n",ret)
- wg.Done()
- }()
- wg.Wait()
- }
执行结果(耗时: 1.019s)
- key-2 get 1
- key-1 get 1
lru 模块
最初是用在内存管理上的一个算法,根据历史的请求数,分析出最热门的数据,并保存下来。
- type Cache struct {
- // MaxEntries is the maximum number of cache entries before
- // an item is evicted. Zero means no limit.
- MaxEntries int
- // OnEvicted optionally specificies a callback function to be
- // executed when an entry is purged from the cache.
- OnEvicted func(key Key,value interface{})
- // contains filtered or unexported fields
- }
- func New(maxEntries int) *Cache
- func (c *Cache) Add(key Key,value interface{})
- func (c *Cache) Get(key Key) (value interface{},ok bool)
- func (c *Cache) Len() int
- func (c *Cache) Remove(key Key)
- func (c *Cache) RemoveOldest()
用法举例
- package main
- import (
- "fmt"
- "github.com/golang/groupcache/lru"
- )
- func main() {
- cache := lru.New(2)
- cache.Add("x","x0")
- cache.Add("y","y0")
- yval,ok := cache.Get("y")
- if ok {
- fmt.Printf("y is %v\n",yval)
- }
- cache.Add("z","z0")
- fmt.Printf("cache length is %d\n",cache.Len())
- _,ok = cache.Get("x")
- if !ok {
- fmt.Printf("x key was weeded out\n")
- }
- }
- // Expect output
- //--------------
- // y is y0
- // cache length is 2
- // x key was weeded out
HTTPPool 模块
- type HTTPPool struct {
- // 可选,为每次的请求封装的Context参数
- Context func(*http.Request) groupcache.Context
- // 可选,不懂这个干啥的
- // 注释说:请求的时候用的就是这个
- Transport func(groupcache.Context) http.RoundTripper
- }
- // self 必须是一个合法的URL指向当前的服务器,比如 "http://10.0.0.1:8000"
- // 这个函数默会注册一个路由
- // http.handle("/_groupcache/",poolInstance)
- // 该路由主要用户节点间获取数据的功能
- // 另外该函数不能重复调用,否则会panic
- func NewHTTPPool(self string) *HTTPPool
- // 更新节点列表
- // 用了consistenthash
- // 奇怪的时候,只有节点添加的函数,并没有删除的
- func (p *HTTPPool) Set(peers ...string)
- // 用一致性hash算法选择一个节点
- func (p *HTTPPool) PickPeer(key string) (groupcache.ProtoGetter,bool)
- // 用于处理通过HTTP传递过来的grpc请求
- func (p *HTTPPool) ServeHTTP(w http.ResponseWriter,r *http.Request)
其他 (待补充)
本来想在官网上找个例子,可以
- package main
- // A SizeReaderAt is a ReaderAt with a Size method.
- //
- // An io.SectionReader implements SizeReaderAt.
- type SizeReaderAt interface {
- Size() int64
- io.ReaderAt
- }
- // NewMultiReaderAt is like io.MultiReader but produces a ReaderAt
- // (and Size),instead of just a reader.
- func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt {
- m := &multi{
- parts: make([]offsetAndSource,len(parts)),}
- var off int64
- for _,p := range parts {
- m.parts = append(m.parts,offsetAndSource{off,p})
- off += p.Size()
- }
- m.size = off
- return m
- }
- // NewChunkAlignedReaderAt returns a ReaderAt wrapper that is backed
- // by a ReaderAt r of size totalSize where the wrapper guarantees that
- // all ReadAt calls are aligned to chunkSize boundaries and of size
- // chunkSize (except for the final chunk,which may be shorter).
- //
- // A chunk-aligned reader is good for caching,letting upper layers have
- // any access pattern,but guarantees that the wrapped ReaderAt sees
- // only nicely-cacheable access patterns & sizes.
- func NewChunkAlignedReaderAt(r SizeReaderAt,chunkSize int) SizeReaderAt {
- // ...
- }
- func part(s string) SizeReaderAt {
- return io.NewSectionReader(strings.NewReader(s),int64(len(s)))
- }
- func handler(w http.ResponseWriter,r *http.Request) {
- sra := NewMultiReaderAt(
- part("Hello,"),part(" world! "),part("You requested "+r.URL.Path+"\n"),)
- rs := io.NewSectionReader(sra,sra.Size())
- http.ServeContent(w,r,"foo.txt",modTime,rs)
- }
- func main(){
- me := "http://10.0.0.1"
- peers := groupcache.NewHTTPPool(me)
- // Whenever peers change:
- peers.Set("http://10.0.0.1","http://10.0.0.2","http://10.0.0.3")
- var thumbNails = groupcache.NewGroup("thumbnail",64<<20,groupcache.GetterFunc(
- func(ctx groupcache.Context,key string,dest groupcache.Sink) error {
- fileName := key
- dest.SetBytes(generateThumbnail(fileName))
- return nil
- }))
- var data []byte
- err := thumbNails.Get(ctx,"big-file.jpg",groupcache.AllocatingByteSliceSink(&data))
- // ...
- http.ServeContent(w,"big-file-thumb.jpg",bytes.NewReader(data))
- }
groupcache 使用例子
- package main
- import (
- "errors"
- "flag"
- "log"
- "net/http"
- "strconv"
- "strings"
- "github.com/golang/groupcache"
- )
- var localStorage map[string]string
- func init() {
- localStorage = make(map[string]string)
- localStorage["hello"] = "world"
- localStorage["info"] = "This is an example"
- }
- func main() {
- port := flag.Int("port",4100,"Listen port")
- flag.Parse()
- // Name have to starts with http://
- self := "http://localhost:" + strconv.Itoa(*port)
- pool := groupcache.NewHTTPPool(self)
- pool.Set(self,"http://localhost:4101")
- var helloworld = groupcache.NewGroup("helloworld",10<<20,groupcache.GetterFunc(
- func(ctx groupcache.Context,dest groupcache.Sink) error {
- log.Printf("groupcache get key: %v",key)
- value,exists := localStorage[key]
- if !exists {
- dest.SetString(key + " NotExist")
- return errors.New(key + " NotExist")
- } else {
- dest.SetString(value)
- return nil
- }
- }))
- http.HandleFunc("/",func(w http.ResponseWriter,r *http.Request) {
- key := strings.TrimPrefix(r.RequestURI,"/")
- log.Printf("Request(%v) key(%v)",r.RemoteAddr,key)
- if key == "" {
- http.Error(w,"Bad Request",http.StatusBadRequest)
- return
- }
- var data []byte
- err := helloworld.Get(nil,groupcache.AllocatingByteSliceSink(&data))
- if err != nil {
- http.Error(w,err.Error(),http.StatusBadRequest)
- return
- }
- log.Printf("cache data: %v",data)
- w.Write(data)
- log.Println("Gets: ",helloworld.Stats.Gets.String())
- log.Println("CacheHits: ",helloworld.Stats.CacheHits.String())
- log.Println("Loads: ",helloworld.Stats.Loads.String())
- log.Println("LocalLoads: ",helloworld.Stats.LocalLoads.String())
- log.Println("PeerErrors: ",helloworld.Stats.PeerErrors.String())
- log.Println("PeerLoads: ",helloworld.Stats.PeerLoads.String())
- })
- http.ListenAndServe(":"+strconv.Itoa(*port),nil)
- }