groupcache源码解读

前端之家收集整理的这篇文章主要介绍了groupcache源码解读前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

Simple usage

gropucache的官方网站是 https://github.com/golang/groupcache

consistenthash模块

一致性hash算法,通常是用在查找一个合适的下载节点时,使负载更平均,但是对于同样的请求始终返回一样的结果。

  1. type Map struct {
  2. hash Hash
  3. replicas int
  4. keys []int // Sorted
  5. hashMap map[int]string
  6. }
  7.  
  8. Map结构中replicas的含义是增加虚拟桶,使数据分布更加均匀。
  9.  
  10. // 创建Map结构
  11. func New(replicas int,fn Hash) *Map
  12. // 添加新的Key
  13. func (m *Map) Add(keys ...string)
  14. // 根据hash(key)获取value
  15. func (m *Map) Get(key string) string
  16. // 判断Map是否为空
  17. func (m *Map) IsEmpty() bool
  18. // 竟然没有提供Remove方法 O_O

用法简单例子

  1. package main
  2.  
  3. import (
  4. "fmt"
  5.  
  6. "github.com/golang/groupcache/consistenthash"
  7. )
  8.  
  9. func main() {
  10. c := consistenthash.New(70,nil)
  11. c.Add("A","B","C","D","E")
  12. for _,key := range []string{"what","nice","what","good","yes!"} {
  13. fmt.Printf("%s -> %s\n",key,c.Get(key))
  14. }
  15. }
  16.  
  17. // Expect output
  18. // -------------
  19. // what -> C
  20. // nice -> A
  21. // what -> C
  22. // nice -> A
  23. // good -> D
  24. // yes! -> E

singleflight 模块

  1. type Group struct {
  2. }
  3.  
  4. // 当多个相同的key请求的时候,函数只被调用一次
  5. func (g *Group) Do(key string,fn func() (interface{},error)) (interface{},error)

使用例子

  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7.  
  8. "github.com/golang/groupcache/singleflight"
  9. )
  10.  
  11. func NewDelayReturn(dur time.Duration,n int) func() (interface{},error) {
  12. return func() (interface{},error) {
  13. time.Sleep(dur)
  14. return n,nil
  15. }
  16. }
  17.  
  18. func main() {
  19. g := singleflight.Group{}
  20. wg := sync.WaitGroup{}
  21. wg.Add(2)
  22. go func() {
  23. ret,err := g.Do("key",NewDelayReturn(time.Second*1,1))
  24. if err != nil {
  25. panic(err)
  26. }
  27. fmt.Printf("key-1 get %v\n",ret)
  28. wg.Done()
  29. }()
  30. go func() {
  31. time.Sleep(100 * time.Millisecond) // make sure this is call is later
  32. ret,NewDelayReturn(time.Second*2,2))
  33. if err != nil {
  34. panic(err)
  35. }
  36. fmt.Printf("key-2 get %v\n",ret)
  37. wg.Done()
  38. }()
  39. wg.Wait()
  40. }

执行结果(耗时: 1.019s)

  1. key-2 get 1
  2. key-1 get 1

lru 模块

最初是用在内存管理上的一个算法,根据历史的请求数,分析出最热门的数据,并保存下来。

  1. type Cache struct {
  2. // MaxEntries is the maximum number of cache entries before
  3. // an item is evicted. Zero means no limit.
  4. MaxEntries int
  5.  
  6. // OnEvicted optionally specificies a callback function to be
  7. // executed when an entry is purged from the cache.
  8. OnEvicted func(key Key,value interface{})
  9. // contains filtered or unexported fields
  10. }
  11.  
  12. func New(maxEntries int) *Cache
  13. func (c *Cache) Add(key Key,value interface{})
  14. func (c *Cache) Get(key Key) (value interface{},ok bool)
  15. func (c *Cache) Len() int
  16. func (c *Cache) Remove(key Key)
  17. func (c *Cache) RemoveOldest()

用法举例

  1. package main
  2.  
  3. import (
  4. "fmt"
  5.  
  6. "github.com/golang/groupcache/lru"
  7. )
  8.  
  9. func main() {
  10. cache := lru.New(2)
  11. cache.Add("x","x0")
  12. cache.Add("y","y0")
  13. yval,ok := cache.Get("y")
  14. if ok {
  15. fmt.Printf("y is %v\n",yval)
  16. }
  17. cache.Add("z","z0")
  18.  
  19. fmt.Printf("cache length is %d\n",cache.Len())
  20. _,ok = cache.Get("x")
  21. if !ok {
  22. fmt.Printf("x key was weeded out\n")
  23. }
  24. }
  25. // Expect output
  26. //--------------
  27. // y is y0
  28. // cache length is 2
  29. // x key was weeded out

HTTPPool 模块

  1. type HTTPPool struct {
  2. // 可选,为每次的请求封装的Context参数
  3. Context func(*http.Request) groupcache.Context
  4.  
  5. // 可选,不懂这个干啥的
  6. // 注释说:请求的时候用的就是这个
  7. Transport func(groupcache.Context) http.RoundTripper
  8. }
  9.  
  10. // self 必须是一个合法的URL指向当前的服务器,比如 "http://10.0.0.1:8000"
  11. // 这个函数默会注册一个路由
  12. // http.handle("/_groupcache/",poolInstance)
  13. // 该路由主要用户节点间获取数据的功能
  14. // 另外该函数不能重复调用,否则会panic
  15. func NewHTTPPool(self string) *HTTPPool
  16.  
  17. // 更新节点列表
  18. // 用了consistenthash
  19. // 奇怪的时候,只有节点添加函数,并没有删除
  20. func (p *HTTPPool) Set(peers ...string)
  21.  
  22. // 用一致性hash算法选择一个节点
  23. func (p *HTTPPool) PickPeer(key string) (groupcache.ProtoGetter,bool)
  24.  
  25. // 用于处理通过HTTP传递过来的grpc请求
  26. func (p *HTTPPool) ServeHTTP(w http.ResponseWriter,r *http.Request)

其他 (待补充)

本来想在官网上找个例子,可以

  1. package main
  2.  
  3. // A SizeReaderAt is a ReaderAt with a Size method.
  4. //
  5. // An io.SectionReader implements SizeReaderAt.
  6. type SizeReaderAt interface {
  7. Size() int64
  8. io.ReaderAt
  9. }
  10.  
  11. // NewMultiReaderAt is like io.MultiReader but produces a ReaderAt
  12. // (and Size),instead of just a reader.
  13. func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt {
  14. m := &multi{
  15. parts: make([]offsetAndSource,len(parts)),}
  16. var off int64
  17. for _,p := range parts {
  18. m.parts = append(m.parts,offsetAndSource{off,p})
  19. off += p.Size()
  20. }
  21. m.size = off
  22. return m
  23. }
  24.  
  25. // NewChunkAlignedReaderAt returns a ReaderAt wrapper that is backed
  26. // by a ReaderAt r of size totalSize where the wrapper guarantees that
  27. // all ReadAt calls are aligned to chunkSize boundaries and of size
  28. // chunkSize (except for the final chunk,which may be shorter).
  29. //
  30. // A chunk-aligned reader is good for caching,letting upper layers have
  31. // any access pattern,but guarantees that the wrapped ReaderAt sees
  32. // only nicely-cacheable access patterns & sizes.
  33. func NewChunkAlignedReaderAt(r SizeReaderAt,chunkSize int) SizeReaderAt {
  34. // ...
  35. }
  36.  
  37. func part(s string) SizeReaderAt {
  38. return io.NewSectionReader(strings.NewReader(s),int64(len(s)))
  39. }
  40.  
  41. func handler(w http.ResponseWriter,r *http.Request) {
  42. sra := NewMultiReaderAt(
  43. part("Hello,"),part(" world! "),part("You requested "+r.URL.Path+"\n"),)
  44. rs := io.NewSectionReader(sra,sra.Size())
  45. http.ServeContent(w,r,"foo.txt",modTime,rs)
  46. }
  47.  
  48.  
  49. func main(){
  50. me := "http://10.0.0.1"
  51. peers := groupcache.NewHTTPPool(me)
  52.  
  53. // Whenever peers change:
  54. peers.Set("http://10.0.0.1","http://10.0.0.2","http://10.0.0.3")
  55. var thumbNails = groupcache.NewGroup("thumbnail",64<<20,groupcache.GetterFunc(
  56. func(ctx groupcache.Context,key string,dest groupcache.Sink) error {
  57. fileName := key
  58. dest.SetBytes(generateThumbnail(fileName))
  59. return nil
  60. }))
  61.  
  62. var data []byte
  63. err := thumbNails.Get(ctx,"big-file.jpg",groupcache.AllocatingByteSliceSink(&data))
  64. // ...
  65. http.ServeContent(w,"big-file-thumb.jpg",bytes.NewReader(data))
  66.  
  67.  
  68. }

groupcache 使用例子

  1. package main
  2.  
  3. import (
  4. "errors"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10.  
  11. "github.com/golang/groupcache"
  12. )
  13.  
  14. var localStorage map[string]string
  15.  
  16. func init() {
  17. localStorage = make(map[string]string)
  18. localStorage["hello"] = "world"
  19. localStorage["info"] = "This is an example"
  20. }
  21.  
  22. func main() {
  23. port := flag.Int("port",4100,"Listen port")
  24. flag.Parse()
  25.  
  26. // Name have to starts with http://
  27. self := "http://localhost:" + strconv.Itoa(*port)
  28. pool := groupcache.NewHTTPPool(self)
  29. pool.Set(self,"http://localhost:4101")
  30.  
  31. var helloworld = groupcache.NewGroup("helloworld",10<<20,groupcache.GetterFunc(
  32. func(ctx groupcache.Context,dest groupcache.Sink) error {
  33. log.Printf("groupcache get key: %v",key)
  34. value,exists := localStorage[key]
  35. if !exists {
  36. dest.SetString(key + " NotExist")
  37. return errors.New(key + " NotExist")
  38. } else {
  39. dest.SetString(value)
  40. return nil
  41. }
  42. }))
  43.  
  44. http.HandleFunc("/",func(w http.ResponseWriter,r *http.Request) {
  45. key := strings.TrimPrefix(r.RequestURI,"/")
  46. log.Printf("Request(%v) key(%v)",r.RemoteAddr,key)
  47. if key == "" {
  48. http.Error(w,"Bad Request",http.StatusBadRequest)
  49. return
  50. }
  51. var data []byte
  52. err := helloworld.Get(nil,groupcache.AllocatingByteSliceSink(&data))
  53. if err != nil {
  54. http.Error(w,err.Error(),http.StatusBadRequest)
  55. return
  56. }
  57. log.Printf("cache data: %v",data)
  58. w.Write(data)
  59. log.Println("Gets: ",helloworld.Stats.Gets.String())
  60. log.Println("CacheHits: ",helloworld.Stats.CacheHits.String())
  61. log.Println("Loads: ",helloworld.Stats.Loads.String())
  62. log.Println("LocalLoads: ",helloworld.Stats.LocalLoads.String())
  63. log.Println("PeerErrors: ",helloworld.Stats.PeerErrors.String())
  64. log.Println("PeerLoads: ",helloworld.Stats.PeerLoads.String())
  65. })
  66. http.ListenAndServe(":"+strconv.Itoa(*port),nil)
  67. }

参考文章

  1. http://talks.golang.org/2013/oscon-dl.slide#1
  2. http://xuchongfeng.github.io/2016/02/21/GroupCache%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB/

猜你在找的Go相关文章