Skip to content

Commit

Permalink
add communication and pick peer in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
spade69 committed Feb 25, 2023
1 parent 0831a32 commit 5b642fc
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 2 deletions.
13 changes: 13 additions & 0 deletions communication/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package communication

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// add return bool
PickPeer(key string) (peer PeerGetter, ok bool)
}

// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
Get(group string, key string) ([]byte, error)
}
8 changes: 7 additions & 1 deletion consistenthash/consistenthash.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consistenthash

import (
"fmt"
"hash/crc32"
"sort"
"strconv"
Expand Down Expand Up @@ -42,7 +43,7 @@ func (m *Map) Add(keys ...string) {
// one key --> m.replicas virtual node.
for i := 0; i < m.replicas; i++ {
// vnode = i+ key to means nvnode keys,
// m.hash() to calculate vnode's val
// m.hash() to calculate vnode's val: 1-->1X, 2-->2X
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
// add vnode hash to hashmap
Expand All @@ -67,5 +68,10 @@ func (m *Map) Get(key string) string {
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
fmt.Println("Get hash is", hash, "idx is", idx, "len is ", len(m.keys))
return m.hashMap[m.keys[idx%len(m.keys)]]
}

func (m *Map) GetKeys() []int {
return m.keys
}
44 changes: 44 additions & 0 deletions group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package group

import (
"fmt"
"log"
"sync"

"github.com/spade69/xxxcache/communication"
"github.com/spade69/xxxcache/core"
)

Expand All @@ -14,6 +16,9 @@ type Group struct {
getter Getter
// concurrent safe cache
mainCache core.Scache
// peers
peers communication.PeerPicker
//
}

// Getter is a interface used to get data from different datasource
Expand Down Expand Up @@ -95,3 +100,42 @@ func (g *Group) GetLocally(key string) (*core.ByteView, error) {
func (g *Group) PopulateCache(key string, value core.ByteView) {
g.mainCache.Set(key, value)
}

// Register HTTPPool into Group which is peers(peer picker)
func (g *Group) RegisterPeers(peers communication.PeerPicker) {
if g.peers != nil {
panic("RegisterPeers call more than once")
}
g.peers = peers
}

// load key using PickPeer to find node and load data
func (g *Group) load(key string) (value *core.ByteView, err error) {
if g.peers == nil {
err := fmt.Errorf("peers not exist")
return nil, err
}
// 1. first pick peer
if peer, ok := g.peers.PickPeer(key); ok {
// 2. try to get cache from peer
bv, err := g.getFromPeer(peer, key)
if err != nil {
log.Println("[GeeCache] Failed to get from peer", err)
return nil, err
}
return &bv, nil
}
// no cache found, read from local, if local not exist, then
// avoke call from remote
return g.GetLocally(key)
}

// load data from peer, get peer first and get byte from peer
func (g *Group) getFromPeer(peer communication.PeerGetter, key string) (core.ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return core.ByteView{}, err
}
bv := core.NewByteView(bytes)
return bv, nil
}
143 changes: 143 additions & 0 deletions group/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package group

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"

"github.com/spade69/xxxcache/communication"
"github.com/spade69/xxxcache/consistenthash"
)

type server int

// HTTPPool implements PeerPicker for a pool of HTTP peers.

type HTTPPool struct {
// this peer's base url , eg: http://example.com:8000
self string
basePath string
// guards peers and httpGetters
mu sync.Mutex
// peers is consistenthash map ,used for specific key to select node
peers *consistenthash.Map
// keyed by e.g. "http://10.0.0.2:8008", mapping remote node and corresponse httpGetter
// one remote endpoint <--> one httpGetter
httpGetters map[string]*HTTPGetter
}

type HTTPGetter struct {
baseURL string
}

const (
defaultBasePath = "/xxxcache/"
defaultReplicas = 50
)

var _ communication.PeerPicker = (*HTTPPool)(nil)

// NewHTTPPool initializes an HTTP pool of peers.
func NewHTTPPool(self string) *HTTPPool {
return &HTTPPool{
self: self, // own address of itself
// prefix of node
basePath: defaultBasePath,
}
}

func (p *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...))
}

// 创建任意类型 server,并实现 ServeHTTP 方法。
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// log.Println(r.URL.Path)
// w.Write([]byte("heeloworld"))
if !strings.HasPrefix(r.URL.Path, p.basePath) {
panic("HTTP pool serving unexpected path:")
}
p.Log("%s %s", r.Method, r.URL.Path)
//
// /<basepath>/<groupname>/<key> required
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// groupNmae means api gourp
groupName := parts[0]
// get key of this group, group means a cache
key := parts[1]
g := GetGroup(groupName)
if g == nil {
http.Error(w, "no such group:"+groupName, http.StatusNotFound)
return
}

view, err := g.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}

// Set Updates the pool's list of peers
// Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。
//并为每一个节点创建了一个 HTTP 客户端 httpGetter。
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
// make a map for peer <--> httpGetter
p.httpGetters = make(map[string]*HTTPGetter, len(peers))
// here we assign map ,write to map ,so using mutex to lock it
for _, peer := range peers {
p.httpGetters[peer] = &HTTPGetter{
baseURL: peer + p.basePath,
}
}
}

// 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点,
// 返回节点对应的 HTTP 客户端。
func (p *HTTPPool) PickPeer(key string) (communication.PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
log.Printf("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}

// baseURL --> remote endpoint
// use http.Get to retrieve return value
func (h *HTTPGetter) Get(group, key string) ([]byte, error) {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
// request using http client
res, err := http.Get(u)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server return : %v", res.Status)
}
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}
Binary file removed main
Binary file not shown.
96 changes: 95 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,95 @@
package main

import (
"flag"
"fmt"
"hash/crc32"
"log"
"net/http"

"github.com/spade69/xxxcache/group"
)

var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}

func createGroup() *group.Group {
return group.NewGroup("scores", 2<<10, group.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}

// startCacheServer() 用来启动缓存服务器:
// 创建 HTTPPool,添加节点信息,注册到 g 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。
func startCacheServer(addr string, addrs []string, g *group.Group) {
// self
peers := group.NewHTTPPool(addr)
// set peers
peers.Set(addrs...)
g.RegisterPeers(peers)
log.Println("xxxcache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers))
}

// api server 9999, used for interactive with client
func startAPIServer(apiAddr string, g *group.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := g.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}

func main() {
fmt.Println("print cache here")
// start server
var port int
var api bool
// command line input
flag.IntVar(&port, "port", 8001, "XXXCAche server port")
// command lint input
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()

// api
apiAddr := "http://127.0.0.1:9999"
addrMap := map[int]string{
8001: "http://127.0.0.1:8001",
8002: "http://127.0.0.1:8002",
8003: "http://127.0.0.1:8003",
}

var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}

g := createGroup()
// api is true
if api {
go startAPIServer(apiAddr, g)
}
startCacheServer(addrMap[port], []string(addrs), g)
// startServer()

}

func testHash() {
key := "test hash"
key2 := "test3 hash"
node := simplehash(key)
Expand All @@ -20,3 +103,14 @@ func simplehash(key string) uint32 {
node := hval % 10
return node
}

func startLocalServer() {
addr := "127.0.0.1:9999"
peers := group.NewHTTPPool(addr)
log.Println("geecache is running at", addr)
// HTTPPool implements ServeHTTP interface --> handler interface
// func ListenAndServe(addr string, handler Handler) error {
// server := &Server{Addr: addr, Handler: handler}
//return server.ListenAndServe()
log.Fatal(http.ListenAndServe(addr, peers))
}
14 changes: 14 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
trap "rm main;kill 0" EXIT

go build -o main
./main -port=8001 &
./main -port=8002 &
./main -port=8003 -api=1 &
sleep 2
echo ">>>> start test"
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom"

wait

Loading

0 comments on commit 5b642fc

Please sign in to comment.