Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

将DB的监控和管理分离 #388

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions backend/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type DB struct {
lastPing int64
}

// 1. DB re-open connections, so no error should be returned due to connection problems
func Open(addr string, user string, password string, dbName string, maxConnNum int) (*DB, error) {
var err error
db := new(DB)
db.addr = addr
db.user = user
Expand All @@ -71,29 +71,19 @@ func Open(addr string, user string, password string, dbName string, maxConnNum i
db.maxConnNum = DefaultMaxConnNum
db.InitConnNum = InitConnCount
}
//check connection
db.checkConn, err = db.newConn()
if err != nil {
db.Close()
return nil, err
}

db.idleConns = make(chan *Conn, db.maxConnNum)
db.cacheConns = make(chan *Conn, db.maxConnNum)
atomic.StoreInt32(&(db.state), Unknown)

for i := 0; i < db.maxConnNum; i++ {
if i < db.InitConnNum {
conn, err := db.newConn()
if err != nil {
db.Close()
return nil, err
}
for i := 0; i < db.InitConnNum; i++ {
conn, err := db.newConn()
// try open as many as possbile, but not required
if err == nil {
conn.pushTimestamp = time.Now().Unix()
db.cacheConns <- conn
} else {
conn := new(Conn)
db.idleConns <- conn
break
}
}
db.SetLastPing()
Expand Down
26 changes: 15 additions & 11 deletions backend/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func (n *Node) GetMasterConn() (*BackendConn, error) {
if db == nil {
return nil, errors.ErrNoMasterConn
}
if atomic.LoadInt32(&(db.state)) == Down {

// ManualDown, Unknown are also not acceptable.
if atomic.LoadInt32(&(db.state)) != Up {
return nil, errors.ErrMasterDown
}

Expand All @@ -84,13 +86,17 @@ func (n *Node) GetSlaveConn() (*BackendConn, error) {
if db == nil {
return nil, errors.ErrNoSlaveDB
}
if atomic.LoadInt32(&(db.state)) == Down {
if atomic.LoadInt32(&(db.state)) != Up {
return nil, errors.ErrSlaveDown
}

return db.GetConn()
}

// checkMaster
// 1. assume Master == nil, after admin DownMaster, so skip check
// 2. in normal condition, Master should never be not nil, even if db is down, so UpMaster should not called by checkMaster
//
func (n *Node) checkMaster() {
db := n.Master
if db == nil {
Expand All @@ -101,14 +107,6 @@ func (n *Node) checkMaster() {
if err := db.Ping(); err != nil {
golog.Error("Node", "checkMaster", "Ping", 0, "db.Addr", db.Addr(), "error", err.Error())
} else {
if atomic.LoadInt32(&(db.state)) == Down {
golog.Info("Node", "checkMaster", "Master up", 0, "db.Addr", db.Addr())
err := n.UpMaster(db.addr)
if err != nil {
golog.Error("Node", "checkMaster", "UpMaster", 0, "db.Addr", db.Addr(), "error", err.Error())
return
}
}
db.SetLastPing()
if atomic.LoadInt32(&(db.state)) != ManualDown {
atomic.StoreInt32(&(db.state), Up)
Expand All @@ -120,7 +118,8 @@ func (n *Node) checkMaster() {
golog.Info("Node", "checkMaster", "Master down", 0,
"db.Addr", db.Addr(),
"Master_down_time", int64(n.DownAfterNoAlive/time.Second))
n.DownMaster(db.addr, Down)
// n.DownMaster(db.addr, Down)
atomic.StoreInt32(&(db.state), Down)
}
}

Expand Down Expand Up @@ -252,6 +251,7 @@ func (n *Node) UpDB(addr string) (*DB, error) {
return db, nil
}

// used for admin only
func (n *Node) UpMaster(addr string) error {
db, err := n.UpDB(addr)
if err != nil {
Expand All @@ -262,6 +262,7 @@ func (n *Node) UpMaster(addr string) error {
return err
}

// used for admin only
func (n *Node) UpSlave(addr string) error {
db, err := n.UpDB(addr)
if err != nil {
Expand All @@ -282,17 +283,20 @@ func (n *Node) UpSlave(addr string) error {
return err
}

// used for admin only
func (n *Node) DownMaster(addr string, state int32) error {
db := n.Master
if db == nil || db.addr != addr {
return errors.ErrNoMasterDB
}

db.Close()
n.Master = nil
atomic.StoreInt32(&(db.state), state)
return nil
}

// used for admin only
func (n *Node) DownSlave(addr string, state int32) error {
n.RLock()
if n.Slave == nil {
Expand Down