Skip to content

Commit

Permalink
Porto mtn improve (#157)
Browse files Browse the repository at this point in the history
* improve porto box wait loop gc stage; fix using wrong context when do some http request; improve naming

* update Build-Depends section

* fix log message debug level

* [porto] work with "starting" containers same as with "running" at gc stage

* add cleanup for volumes and improve some log messages

* impove logging
  • Loading branch information
dkarpukhin authored Oct 9, 2018
1 parent aa95084 commit dd5693e
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 29 deletions.
2 changes: 1 addition & 1 deletion debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Source: cocaine-isolate-daemon
Section: misc
Priority: extra
Maintainer: Anton Tyurin <noxiouz@yandex.ru>
Build-Depends: debhelper (>= 7), golang (>= 2:1.6)
Build-Depends: debhelper (>= 7), golang (>= 2:1.6), git
Standards-Version: 3.9.5

Package: cocaine-isolate-daemon
Expand Down
90 changes: 67 additions & 23 deletions isolate/mtn.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Allocation struct {
Hostname string
Ip string
Id string
NetId string
Box string
Used bool
}

Expand Down Expand Up @@ -202,7 +204,7 @@ func (c *MtnState) PoolInit(ctx context.Context) bool {
return false
}
if b.Get([]byte(alloc.Id)) == nil {
if buf, err := json.Marshal(Allocation{alloc.Net, alloc.Hostname, alloc.Ip, alloc.Id, false}); err != nil {
if buf, err := json.Marshal(Allocation{alloc.Net, alloc.Hostname, alloc.Ip, alloc.Id, netId, "", false}); err != nil {
log.G(ctx).Errorf("Cant continue transaction inside PoolInit(), err: %s", err)
return false
} else if err := b.Put([]byte(alloc.Id), buf); err != nil {
Expand Down Expand Up @@ -256,48 +258,52 @@ func (c *MtnState) GetAllocations(logCtx context.Context) (map[string][]Allocati
r := make(map[string][]Allocation)
jresp := []RawAlloc{}
decoder := json.NewDecoder(rh.Body)
log.G(ctx).Debugf("reqHttp.Body from allocator getted in GetAllocations(): %s", decoder)
rErr := decoder.Decode(&jresp)
if rErr != nil {
return nil, rErr
}
for _, a := range jresp {
r[a.Network] = append(r[a.Network], Allocation{a.Porto.Net, a.Porto.Hostname, a.Porto.Ip, a.Id, false})
r[a.Network] = append(r[a.Network], Allocation{a.Porto.Net, a.Porto.Hostname, a.Porto.Ip, a.Id, a.Network, "", false})
}
log.G(logCtx).Debugf("GetAllocations() successfull ended with ContentLength size %d.", req.ContentLength)
return r, nil
}

func (c *MtnState) RequestAllocs(ctx context.Context, netid string) (map[string]Allocation, error) {
r := make(map[string]Allocation)
ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
httpCtx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
defer cancel()
jsonBody := PostAllocreq{netid, c.Cfg.Ident, c.Cfg.SchedLabel}
txtBody, mrshErr := json.Marshal(jsonBody)
if mrshErr != nil {
return nil, mrshErr
txtBody, errMrsh := json.Marshal(jsonBody)
if errMrsh != nil {
return nil, errMrsh
}
log.G(ctx).Debugf("c.Cfg.Allocbuffer inside RequestAllocs() is %d.", c.Cfg.Allocbuffer)
for i := 0; i < c.Cfg.Allocbuffer; i++ {
req, nrErr := http.NewRequest("POST", c.Cfg.Url, bytes.NewReader(txtBody))
if nrErr != nil {
return nil, nrErr
req, errNewReq := http.NewRequest("POST", c.Cfg.Url, bytes.NewReader(txtBody))
if errNewReq != nil {
return nil, errNewReq
}
for header, value := range c.Cfg.Headers {
req.Header.Set(header, value)
}
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
rh, doErr := http.DefaultClient.Do(req)
if doErr != nil {
return nil, doErr
req = req.WithContext(httpCtx)
reqHttp, errDo := http.DefaultClient.Do(req)
if errDo != nil {
return nil, errDo
}
jresp := RawAlloc{}
decoder := json.NewDecoder(rh.Body)
rErr := decoder.Decode(&jresp)
rh.Body.Close()
if rErr != nil {
return nil, rErr
jsonResp := RawAlloc{}
decoder := json.NewDecoder(reqHttp.Body)
errDecode := decoder.Decode(&jsonResp)
log.G(ctx).Debugf("reqHttp.Body from allocator getted in RequestAllocs(): %s", decoder)
reqHttp.Body.Close()
if errDecode != nil {
return nil, errDecode
}
r[jresp.Id] = Allocation{jresp.Porto.Net, jresp.Porto.Hostname, jresp.Porto.Ip, jresp.Id, false}
log.G(ctx).Debugf("Allocation from allocator getted in RequestAllocs(): %s", jsonResp)
r[jsonResp.Id] = Allocation{jsonResp.Porto.Net, jsonResp.Porto.Hostname, jsonResp.Porto.Ip, jsonResp.Id, netid, "", false}
}
log.G(ctx).Debugf("RequestAllocs() successfull ended with %s.", r)
return r, nil
Expand All @@ -315,7 +321,38 @@ func (c *MtnState) DbAllocIsFree(ctx context.Context, value []byte) bool {
return true
}

func (c *MtnState) GetDbAlloc(ctx context.Context, tx *bolt.Tx, netId string) (Allocation, error) {
func (c *MtnState) UsedAllocations(ctx context.Context) ([]Allocation, error) {
tx, errTx := c.Db.Begin(true)
var allocs []Allocation
if errTx != nil {
return allocs, errTx
}
defer tx.Rollback()
errBkLs := tx.ForEach(func(netId []byte, b *bolt.Bucket) error {
errAllocsLs := b.ForEach(func(allocId []byte, value []byte) error {
if c.DbAllocIsFree(ctx, value) {
return nil
}
log.G(ctx).Infof("Found used alloc for net id %s with id %s: %s", netId, allocId, value)
var a Allocation
if errUnmrsh := json.Unmarshal(value, &a); errUnmrsh != nil {
return errUnmrsh
}
allocs = append(allocs, a)
return nil
})
return errAllocsLs
})
if errBkLs != nil {
return allocs, errBkLs
}
if errCommit := tx.Commit(); errCommit != nil {
return allocs, errCommit
}
return allocs, nil
}

func (c *MtnState) GetDbAlloc(ctx context.Context, tx *bolt.Tx, netId string, box string) (Allocation, error) {
b := tx.Bucket([]byte(netId))
if b == nil {
return Allocation{}, fmt.Errorf("BUG inside GetDbAlloc()! Backet %s not exist!", netId)
Expand All @@ -328,6 +365,7 @@ func (c *MtnState) GetDbAlloc(ctx context.Context, tx *bolt.Tx, netId string) (A
return a, err
}
a.Used = true
a.Box = box
id := a.Id
value, errMrsh := json.Marshal(a)
if errMrsh != nil {
Expand Down Expand Up @@ -355,6 +393,7 @@ func (c *MtnState) GetDbAlloc(ctx context.Context, tx *bolt.Tx, netId string) (A
for id, alloc := range allocs {
if !gotcha {
alloc.Used = true
alloc.Box = box
a = alloc
gotcha = true
}
Expand Down Expand Up @@ -440,19 +479,24 @@ func (c *MtnState) BindAllocs(ctx context.Context, netId string) error {
if c.Cfg.Allocbuffer > fCount {
allocs, err := c.RequestAllocs(ctx, netId)
if err != nil {
log.G(ctx).Errorf("Cant do c.RequestAllocs(%s) inside BindAllocs(), err: %s", netId, err)
return err
}
log.G(ctx).Debugf("c.RequestAllocs(ctx, %s) end sucessfully with: %s.", netId, allocs)
b, errBk := tx.CreateBucketIfNotExists([]byte(netId))
if errBk != nil {
log.G(ctx).Errorf("Cant create bucket inside BindAllocs(), err: %s", errBk)
return errBk
}
for id, alloc := range allocs {
value, errMrsh := json.Marshal(alloc)
if errMrsh != nil {
log.G(ctx).Errorf("Cant Marshal(%s).", alloc)
return errMrsh
}
errPut := b.Put([]byte(id), value)
if errPut != nil {
log.G(ctx).Errorf("Cant b.Put(%s,%s)", id, value)
return errPut
}
}
Expand All @@ -463,14 +507,14 @@ func (c *MtnState) BindAllocs(ctx context.Context, netId string) error {
return nil
}

func (c *MtnState) UseAlloc(ctx context.Context, netId string) (Allocation, error) {
func (c *MtnState) UseAlloc(ctx context.Context, netId string, box string) (Allocation, error) {
tx, errTx := c.Db.Begin(true)
if errTx != nil {
log.G(ctx).Errorf("Cant start transaction inside UseAlloc(), err: %s", errTx)
return Allocation{}, errTx
}
defer tx.Rollback()
a, e := c.GetDbAlloc(ctx, tx, netId)
a, e := c.GetDbAlloc(ctx, tx, netId, box)
log.G(ctx).Debugf("UseAlloc(): a, e: %s, %s.", a, e)
if e != nil {
return Allocation{}, e
Expand Down
57 changes: 53 additions & 4 deletions isolate/porto/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (c *portoBoxConfig) ContainerRootDir(name, containerID string) string {

// Box operates with Porto to launch containers
type Box struct {
Name string
config *portoBoxConfig
GlobalState isolate.GlobalState
journal *journal
Expand Down Expand Up @@ -181,7 +182,9 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta
}

ctx, onClose := context.WithCancel(ctx)
name := "porto"
box := &Box{
Name: name,
config: config,
GlobalState: gstate,
journal: newJournal(),
Expand Down Expand Up @@ -304,18 +307,64 @@ func (b *Box) waitLoop(ctx context.Context) {
if err != nil {
log.G(ctx).Warnf("unable to list porto containers for gc: %v", err)
}
usedAllocations, errUsedAllocs := b.GlobalState.Mtn.UsedAllocations(ctx)
if errUsedAllocs != nil {
log.G(ctx).Errorf("Cant get UsedAllocations(). Err: %s", errUsedAllocs)
return
}
var ips []string
for _, name := range containerNames {
containerState, _ := portoConn.GetProperty(name, "state")
if containerState == "dead" {
log.G(ctx).Debugf("At gc state destroy container: %s", name)
log.G(ctx).Debugf("At gc state destroy dead container: %s", name)
portoConn.Destroy(name)
} else if containerState == "stopped" {
log.G(ctx).Debugf("At gc state destroy stopped container: %s", name)
portoConn.Destroy(name)
} else if containerState == "meta" {
continue
} else if containerState == "running" || containerState == "starting" {
containerIp, _ := portoConn.GetProperty(name, "ip")
if len(containerIp) > 2 {
ips = append(ips, containerIp)
}
}
}
if len(usedAllocations) > 0 && len(ips) > 0 {
for i := 0; i < len(usedAllocations); i++ {
usedAllocation := usedAllocations[i]
for _, ip := range ips {
if usedAllocation.Ip == ip {
log.G(ctx).Debugf("At gc state we found that already runned container use used mtn allocation: %s. Its fine.", usedAllocation)
usedAllocations = append(usedAllocations[:i], usedAllocations[i+1:]...)
i--
break
}
}
}
}
if len(usedAllocations) > 0 {
log.G(ctx).Debugf("At gc state for %s some allocation still marked as \"used\": %s. So lets free them.", b.Name, usedAllocations)
for _, usedAllocation := range usedAllocations {
if usedAllocation.Box == b.Name {
log.G(ctx).Debugf("Try free alloc with b.GlobalState.Mtn.UnuseAlloc(ctx, %s, %s)", usedAllocation.NetId, usedAllocation.Id)
b.GlobalState.Mtn.UnuseAlloc(ctx, usedAllocation.NetId, usedAllocation.Id)
}
}
}
// Now try clean unused volumes
volumes, errLv := portoConn.ListVolumes("", "")
if errLv != nil {
log.G(ctx).Debugf("At gc state for ListVolumes() we get that error: %s", errLv)
}
for _, volume := range volumes {
portoConn.UnlinkVolume(volume.Path, "***")
}
}

LOOP:
for {
log.G(ctx).Infof("next iteration of waitLoop will started after %d second of sleep.", b.config.WaitLoopStepSec)
log.G(ctx).Debugf("next iteration of waitLoop will started after %d second of sleep.", b.config.WaitLoopStepSec)
time.Sleep(time.Duration(b.config.WaitLoopStepSec) * time.Second)
if closed(portoConn) {
return
Expand All @@ -336,7 +385,6 @@ LOOP:
}
}


ourContainers := []string{}
b.muContainers.Lock()
for k, _ := range b.containers {
Expand Down Expand Up @@ -506,7 +554,7 @@ func (b *Box) Spool(ctx context.Context, name string, opts isolate.RawProfile) (
if b.GlobalState.Mtn.Cfg.Enable && profile.Network["mtn"] == "enable" {
err := b.GlobalState.Mtn.BindAllocs(ctx, profile.Network["netid"])
if err != nil {
return fmt.Errorf("Cant bind mtn alllocaton at spool with state: %s, and error: %s", b.GlobalState.Mtn, err)
return fmt.Errorf("Cant bind mtn alllocaton at spool with netid: %s, and error: %s", profile.Network["netid"], err)
}
log.G(ctx).Debugf("Successfully call b.GlobalState.Mtn.BindAllocs() at spool %s with project id %s.", name, profile.Network["netid"])
}
Expand Down Expand Up @@ -538,6 +586,7 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W

ID := b.appGenLabel(config.Name) + "_" + config.Args["--uuid"]
cfg := containerConfig{
BoxName: b.Name,
Root: filepath.Join(b.config.Containers, ID),
ID: b.addRootNamespacePrefix(ID),
Layer: layers,
Expand Down
3 changes: 2 additions & 1 deletion isolate/porto/container_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type containerConfig struct {
execInfo
State isolate.GlobalState

BoxName string
Root string
ID string
Layer string
Expand Down Expand Up @@ -282,7 +283,7 @@ func (c *containerConfig) CreateContainer(ctx context.Context, portoConn porto.A
properties["net"] = pickNetwork(c.NetworkMode)
} else {
if c.Network["mtn"] == "enable" {
alloc, err := c.State.Mtn.UseAlloc(ctx, string(c.Network["netid"]))
alloc, err := c.State.Mtn.UseAlloc(ctx, string(c.Network["netid"]), c.BoxName)
if err != nil {
logger.WithError(err).Errorf("get error from c.State.Mtn.UseAlloc, with netid: %s", c.Network["netid"])
return err
Expand Down
1 change: 1 addition & 0 deletions isolate/porto/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func TestContainer(t *testing.T) {
}

cfg := containerConfig{
BoxName: "porto",
Root: dir,
ID: "IsolateLinuxApline",
Layer: "testalpine",
Expand Down

0 comments on commit dd5693e

Please sign in to comment.