diff --git a/communication/peers.go b/communication/peers.go new file mode 100644 index 0000000..525181b --- /dev/null +++ b/communication/peers.go @@ -0,0 +1,13 @@ +package communication + +// PeerPicker is the interface that must be implemented to locate +// the peer that owns a specific key. +type PeerPicker interface { + // add return bool + PickPeer(key string) (peer PeerGetter, ok bool) +} + +// PeerGetter is the interface that must be implemented by a peer. +type PeerGetter interface { + Get(group string, key string) ([]byte, error) +} diff --git a/consistenthash/consistenthash.go b/consistenthash/consistenthash.go index 0fef99d..ce3cc31 100644 --- a/consistenthash/consistenthash.go +++ b/consistenthash/consistenthash.go @@ -1,6 +1,7 @@ package consistenthash import ( + "fmt" "hash/crc32" "sort" "strconv" @@ -42,7 +43,7 @@ func (m *Map) Add(keys ...string) { // one key --> m.replicas virtual node. for i := 0; i < m.replicas; i++ { // vnode = i+ key to means nvnode keys, - // m.hash() to calculate vnode's val + // m.hash() to calculate vnode's val: 1-->1X, 2-->2X hash := int(m.hash([]byte(strconv.Itoa(i) + key))) m.keys = append(m.keys, hash) // add vnode hash to hashmap @@ -67,5 +68,10 @@ func (m *Map) Get(key string) string { idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) + fmt.Println("Get hash is", hash, "idx is", idx, "len is ", len(m.keys)) return m.hashMap[m.keys[idx%len(m.keys)]] } + +func (m *Map) GetKeys() []int { + return m.keys +} diff --git a/group/group.go b/group/group.go index 59e0874..0d85a11 100644 --- a/group/group.go +++ b/group/group.go @@ -2,8 +2,10 @@ package group import ( "fmt" + "log" "sync" + "github.com/spade69/xxxcache/communication" "github.com/spade69/xxxcache/core" ) @@ -14,6 +16,9 @@ type Group struct { getter Getter // concurrent safe cache mainCache core.Scache + // peers + peers communication.PeerPicker + // } // Getter is a interface used to get data from different datasource @@ -95,3 +100,42 @@ func (g *Group) GetLocally(key string) (*core.ByteView, error) { func (g *Group) PopulateCache(key string, value core.ByteView) { g.mainCache.Set(key, value) } + +// Register HTTPPool into Group which is peers(peer picker) +func (g *Group) RegisterPeers(peers communication.PeerPicker) { + if g.peers != nil { + panic("RegisterPeers call more than once") + } + g.peers = peers +} + +// load key using PickPeer to find node and load data +func (g *Group) load(key string) (value *core.ByteView, err error) { + if g.peers == nil { + err := fmt.Errorf("peers not exist") + return nil, err + } + // 1. first pick peer + if peer, ok := g.peers.PickPeer(key); ok { + // 2. try to get cache from peer + bv, err := g.getFromPeer(peer, key) + if err != nil { + log.Println("[GeeCache] Failed to get from peer", err) + return nil, err + } + return &bv, nil + } + // no cache found, read from local, if local not exist, then + // avoke call from remote + return g.GetLocally(key) +} + +// load data from peer, get peer first and get byte from peer +func (g *Group) getFromPeer(peer communication.PeerGetter, key string) (core.ByteView, error) { + bytes, err := peer.Get(g.name, key) + if err != nil { + return core.ByteView{}, err + } + bv := core.NewByteView(bytes) + return bv, nil +} diff --git a/group/http.go b/group/http.go new file mode 100644 index 0000000..1d25a71 --- /dev/null +++ b/group/http.go @@ -0,0 +1,143 @@ +package group + +import ( + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strings" + "sync" + + "github.com/spade69/xxxcache/communication" + "github.com/spade69/xxxcache/consistenthash" +) + +type server int + +// HTTPPool implements PeerPicker for a pool of HTTP peers. + +type HTTPPool struct { + // this peer's base url , eg: http://example.com:8000 + self string + basePath string + // guards peers and httpGetters + mu sync.Mutex + // peers is consistenthash map ,used for specific key to select node + peers *consistenthash.Map + // keyed by e.g. "http://10.0.0.2:8008", mapping remote node and corresponse httpGetter + // one remote endpoint <--> one httpGetter + httpGetters map[string]*HTTPGetter +} + +type HTTPGetter struct { + baseURL string +} + +const ( + defaultBasePath = "/xxxcache/" + defaultReplicas = 50 +) + +var _ communication.PeerPicker = (*HTTPPool)(nil) + +// NewHTTPPool initializes an HTTP pool of peers. +func NewHTTPPool(self string) *HTTPPool { + return &HTTPPool{ + self: self, // own address of itself + // prefix of node + basePath: defaultBasePath, + } +} + +func (p *HTTPPool) Log(format string, v ...interface{}) { + log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...)) +} + +// 创建任意类型 server,并实现 ServeHTTP 方法。 +func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // log.Println(r.URL.Path) + // w.Write([]byte("heeloworld")) + if !strings.HasPrefix(r.URL.Path, p.basePath) { + panic("HTTP pool serving unexpected path:") + } + p.Log("%s %s", r.Method, r.URL.Path) + // + // /// required + parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2) + if len(parts) != 2 { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + // groupNmae means api gourp + groupName := parts[0] + // get key of this group, group means a cache + key := parts[1] + g := GetGroup(groupName) + if g == nil { + http.Error(w, "no such group:"+groupName, http.StatusNotFound) + return + } + + view, err := g.Get(key) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(view.ByteSlice()) +} + +// Set Updates the pool's list of peers +// Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。 +//并为每一个节点创建了一个 HTTP 客户端 httpGetter。 +func (p *HTTPPool) Set(peers ...string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peers = consistenthash.New(defaultReplicas, nil) + p.peers.Add(peers...) + // make a map for peer <--> httpGetter + p.httpGetters = make(map[string]*HTTPGetter, len(peers)) + // here we assign map ,write to map ,so using mutex to lock it + for _, peer := range peers { + p.httpGetters[peer] = &HTTPGetter{ + baseURL: peer + p.basePath, + } + } +} + +// 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点, +// 返回节点对应的 HTTP 客户端。 +func (p *HTTPPool) PickPeer(key string) (communication.PeerGetter, bool) { + p.mu.Lock() + defer p.mu.Unlock() + if peer := p.peers.Get(key); peer != "" && peer != p.self { + log.Printf("Pick peer %s", peer) + return p.httpGetters[peer], true + } + return nil, false +} + +// baseURL --> remote endpoint +// use http.Get to retrieve return value +func (h *HTTPGetter) Get(group, key string) ([]byte, error) { + u := fmt.Sprintf( + "%v%v/%v", + h.baseURL, + url.QueryEscape(group), + url.QueryEscape(key), + ) + // request using http client + res, err := http.Get(u) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("server return : %v", res.Status) + } + bytes, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("reading response body: %v", err) + } + return bytes, nil +} diff --git a/main b/main deleted file mode 100755 index 7f01189..0000000 Binary files a/main and /dev/null differ diff --git a/main.go b/main.go index 2295352..13965e4 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,95 @@ package main import ( + "flag" "fmt" "hash/crc32" + "log" + "net/http" + + "github.com/spade69/xxxcache/group" ) +var db = map[string]string{ + "Tom": "630", + "Jack": "589", + "Sam": "567", +} + +func createGroup() *group.Group { + return group.NewGroup("scores", 2<<10, group.GetterFunc( + func(key string) ([]byte, error) { + log.Println("[SlowDB] search key", key) + if v, ok := db[key]; ok { + return []byte(v), nil + } + return nil, fmt.Errorf("%s not exist", key) + })) +} + +// startCacheServer() 用来启动缓存服务器: +// 创建 HTTPPool,添加节点信息,注册到 g 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。 +func startCacheServer(addr string, addrs []string, g *group.Group) { + // self + peers := group.NewHTTPPool(addr) + // set peers + peers.Set(addrs...) + g.RegisterPeers(peers) + log.Println("xxxcache is running at", addr) + log.Fatal(http.ListenAndServe(addr[7:], peers)) +} + +// api server 9999, used for interactive with client +func startAPIServer(apiAddr string, g *group.Group) { + http.Handle("/api", http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + key := r.URL.Query().Get("key") + view, err := g.Get(key) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(view.ByteSlice()) + })) + log.Println("fontend server is running at", apiAddr) + log.Fatal(http.ListenAndServe(apiAddr[7:], nil)) +} + func main() { - fmt.Println("print cache here") + // start server + var port int + var api bool + // command line input + flag.IntVar(&port, "port", 8001, "XXXCAche server port") + // command lint input + flag.BoolVar(&api, "api", false, "Start a api server?") + flag.Parse() + + // api + apiAddr := "http://127.0.0.1:9999" + addrMap := map[int]string{ + 8001: "http://127.0.0.1:8001", + 8002: "http://127.0.0.1:8002", + 8003: "http://127.0.0.1:8003", + } + + var addrs []string + for _, v := range addrMap { + addrs = append(addrs, v) + } + + g := createGroup() + // api is true + if api { + go startAPIServer(apiAddr, g) + } + startCacheServer(addrMap[port], []string(addrs), g) + // startServer() + +} + +func testHash() { key := "test hash" key2 := "test3 hash" node := simplehash(key) @@ -20,3 +103,14 @@ func simplehash(key string) uint32 { node := hval % 10 return node } + +func startLocalServer() { + addr := "127.0.0.1:9999" + peers := group.NewHTTPPool(addr) + log.Println("geecache is running at", addr) + // HTTPPool implements ServeHTTP interface --> handler interface + // func ListenAndServe(addr string, handler Handler) error { + // server := &Server{Addr: addr, Handler: handler} + //return server.ListenAndServe() + log.Fatal(http.ListenAndServe(addr, peers)) +} diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..d66afea --- /dev/null +++ b/run.sh @@ -0,0 +1,14 @@ +#!/bin/bash +trap "rm main;kill 0" EXIT + +go build -o main +./main -port=8001 & +./main -port=8002 & +./main -port=8003 -api=1 & +sleep 2 +echo ">>>> start test" +curl "http://127.0.0.1:9999/api?key=Tom" & +curl "http://127.0.0.1:9999/api?key=Tom" + +wait + diff --git a/singleflight/singleflight.go b/singleflight/singleflight.go new file mode 100644 index 0000000..8004d3a --- /dev/null +++ b/singleflight/singleflight.go @@ -0,0 +1,46 @@ +package singleflight + +import "sync" + +type call struct { + wg sync.WaitGroup + val interface{} + err error +} + +type Group struct { + // protects m + mu sync.Mutex + m map[string]*call +} + +// key :request key in cache, fn : only call once and return no matter ho many time do +func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + // g.mu is protecting Group member m -> is map for every key + if c, ok := g.m[key]; ok { + g.mu.Unlock() + // if request processing then wait + c.wg.Wait() + return c.val, c.err + } + + c := new(call) + // request and lock + c.wg.Add(1) + // mapping for key --> c, means already get request to process + g.m[key] = c + g.mu.Unlock() + // assign to call obj, call fn + c.val, c.err = fn() + c.wg.Done() + // lock + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() + + return c.val, c.err +} diff --git a/test/consistent_test.go b/test/consistent_test.go new file mode 100644 index 0000000..d3e70f0 --- /dev/null +++ b/test/consistent_test.go @@ -0,0 +1,47 @@ +package test + +import ( + "strconv" + "testing" + + "github.com/spade69/xxxcache/consistenthash" +) + +func TestHashing(t *testing.T) { + // replicas 3 means : 1 key --> 3 virtual node + hash := consistenthash.New(3, func(key []byte) uint32 { + i, _ := strconv.Atoi(string(key)) + return uint32(i) + }) + // Given the above hash function, this will give replicas with "hashs": + // 2, 4, 6, 12, 14, 16,22,24,26.. + // 1. frist add 6, 16, 26 + // 2. second add 4, 14, 24 + // 3. third add 2, 12 ,22 + hash.Add("6", "4", "2") + testCases := map[string]string{ + "2": "2", + "11": "2", + "23": "4", + "27": "2", + "200": "2", + // "200": "4", + + } + t.Log("keys is ", hash.GetKeys()) + for k, v := range testCases { + if hash.Get(k) != v { + t.Log("hash get is ", hash.Get(k)) + t.Errorf("Asking for %s, should have yield %s", k, v) + } + } + // Adds 8 ,18, 28 + hash.Add("8") + // 27 should now map to 8 + testCases["27"] = "8" + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yield %s", k, v) + } + } +}