diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 3b262d60320e..cb3c43b3aa33 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -387,6 +387,36 @@ func (r *baseMeta) txUnlock(idx uint) { r.txlocks[idx%nlocks].Unlock() } +func (r *baseMeta) txBatchLock(inodes ...Ino) func() { + switch len(inodes) { + case 0: + return func() {} + case 1: // most cases + r.txLock(uint(inodes[0])) + return func() { r.txUnlock(uint(inodes[0])) } + default: // for rename and more + inodeSlots := make([]int, len(inodes)) + for i, ino := range inodes { + inodeSlots[i] = int(ino % nlocks) + } + sort.Ints(inodeSlots) + uniqInodeSlots := inodeSlots[:0] + for i := 0; i < len(inodeSlots); i++ { // Go does not support recursive locks + if i == 0 || inodeSlots[i] != inodeSlots[i-1] { + uniqInodeSlots = append(uniqInodeSlots, inodeSlots[i]) + } + } + for _, idx := range uniqInodeSlots { + r.txlocks[idx].Lock() + } + return func() { + for _, idx := range uniqInodeSlots { + r.txlocks[idx].Unlock() + } + } + } +} + func (r *baseMeta) OnMsg(mtype uint32, cb MsgCallback) { r.msgCallbacks.Lock() defer r.msgCallbacks.Unlock() diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 3923f55743c4..151263aa6d8a 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -24,6 +24,7 @@ import ( "context" "errors" "fmt" + "math/rand" "os" "reflect" "runtime" @@ -3282,3 +3283,62 @@ func TestSymlinkCache(t *testing.T) { cache.doClean() require.Equal(t, int32(8000), cache.size.Load()) } + +func TestTxBatchLock(t *testing.T) { + var base baseMeta + // 0 inode + func() { + defer base.txBatchLock()() + }() + // 1 inodes + func() { + defer base.txBatchLock(2)() + }() + // 2 inodes + func() { + defer base.txBatchLock(1, 2)() + }() + // no reentrant + func() { + defer base.txBatchLock(1, 1, nlocks+1)() + }() + // no deadlock - sequential + func() { + batch1 := []Ino{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + batch2 := []Ino{1 + nlocks*9, 2 + nlocks*8, 3 + nlocks*7, 4 + nlocks*6, 5 + nlocks*5, 6 + nlocks*4, 7 + nlocks*3, 8 + nlocks*2, 9 + nlocks, 10} + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(2) + go func() { + defer wg.Done() + defer base.txBatchLock(batch1...)() + }() + go func() { + defer wg.Done() + defer base.txBatchLock(batch2...)() + }() + } + wg.Wait() + }() + // no deadlock - fuzz testing + func() { + var batch1, batch2 []Ino + for i := 0; i < 100; i++ { + batch1 = append(batch1, Ino(rand.Uint64()+1)) + batch2 = append(batch2, Ino(rand.Uint64()+1)) + } + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(2) + go func() { + defer wg.Done() + defer base.txBatchLock(batch1...)() + }() + go func() { + defer wg.Done() + defer base.txBatchLock(batch2...)() + }() + } + wg.Wait() + }() +} diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 9f46bf2f5c7c..17652c238ef3 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -908,10 +908,7 @@ func (m *dbMeta) txn(f func(s *xorm.Session) error, inodes ...Ino) error { inodes = []Ino{1} } - if len(inodes) > 0 { - m.txLock(uint(inodes[0])) - defer m.txUnlock(uint(inodes[0])) - } + defer m.txBatchLock(inodes...)() var lastErr error for i := 0; i < 50; i++ { _, err := m.db.Transaction(func(s *xorm.Session) (interface{}, error) { @@ -2252,7 +2249,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst } } return err - }) + }, parentSrc, parentDst) if err == nil && !exchange && trash == 0 { if dino > 0 && dn.Type == TypeFile && dn.Nlink == 0 { m.fileDeleted(opened, false, dino, dn.Length) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 010b7cf298ef..57bf1e2caadb 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -809,10 +809,7 @@ func (m *kvMeta) txn(ctx context.Context, f func(tx *kvTxn) error, inodes ...Ino } start := time.Now() defer func() { m.txDist.Observe(time.Since(start).Seconds()) }() - if len(inodes) > 0 { - m.txLock(uint(inodes[0])) - defer m.txUnlock(uint(inodes[0])) - } + defer m.txBatchLock(inodes...)() var lastErr error for i := 0; i < 50; i++ { err := m.client.txn(ctx, f, i) @@ -1491,10 +1488,6 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst var dtyp uint8 var tattr Attr var newSpace, newInode int64 - lockParent := parentSrc - if isTrash(lockParent) { - lockParent = parentDst - } err := m.txn(ctx, func(tx *kvTxn) error { opened = false dino, dtyp = 0, 0 @@ -1731,7 +1724,7 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst tx.set(m.inodeKey(parentDst), m.marshal(&dattr)) } return nil - }, lockParent) + }, parentSrc, parentDst) if err == nil && !exchange && trash == 0 { if dino > 0 && dtyp == TypeFile && tattr.Nlink == 0 { m.fileDeleted(opened, false, dino, tattr.Length)