Skip to content

Commit

Permalink
fix: Break the tenant reload transaction further
Browse files Browse the repository at this point in the history
  • Loading branch information
JigarJoshi committed Jun 8, 2023
1 parent b1a26b1 commit 80b4eea
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 41 deletions.
60 changes: 38 additions & 22 deletions server/metadata/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,10 @@ func (m *TenantManager) CreateOrGetTenant(ctx context.Context, namespace Namespa

defer func() {
if err == nil {
if err = tx.Commit(ctx); err == nil {
// commit succeed, so we can safely cache it now, for other workers it may happen as part of the
// first call in query lifecycle
m.tenants[namespace.StrId()] = tenant
m.idToTenantMap[namespace.Id()] = namespace.StrId()
}
// commit succeed, so we can safely cache it now, for other workers it may happen as part of the
// first call in query lifecycle
m.tenants[namespace.StrId()] = tenant
m.idToTenantMap[namespace.Id()] = namespace.StrId()
} else {
_ = tx.Rollback(ctx)
}
Expand Down Expand Up @@ -379,7 +377,7 @@ func (m *TenantManager) GetTenant(ctx context.Context, namespaceId string) (*Ten
namespace := NewTenantNamespace(namespaceId, metadata)
tenant = NewTenant(namespace, m.kvStore, m.searchStore,
m.metaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator)
if err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot); err != nil {
if err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr); err != nil {
return nil, err
}

Expand Down Expand Up @@ -464,7 +462,7 @@ func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transa

tenant := NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator)
tenant.Lock()
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr)
tenant.Unlock()
return tenant, err
}
Expand Down Expand Up @@ -584,17 +582,13 @@ func (m *TenantManager) reload(ctx context.Context, currentVersion Version) erro
return err
}
tenant.Lock()
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr)
tenant.Unlock()
if err != nil {
log.Err(err).Msgf("reloading a tenant failed '%s'", tenant.name)
_ = tx.Rollback(ctx)
return err
}
if err = tx.Commit(ctx); err != nil {
log.Err(err).Msgf("committing a reloading of tenant failed '%s'", tenant.name)
return err
}
}
return nil
}
Expand Down Expand Up @@ -650,7 +644,7 @@ func NewTenant(namespace Namespace, kvStore kv.TxStore, searchStore search.Store
// thread will actually perform reload. This is a blocking API which means if most of the requests detected that the
// tenant state is stale then they all will block till one of them will reload the tenant state from the database. All
// the blocking transactions will be restarted to ensure they see the latest view of the tenant.
func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) error {
func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse, txMgr *transaction.Manager) error {
if !tenant.shouldReload(version) {
return nil
}
Expand All @@ -662,7 +656,7 @@ func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Ver
return nil
}

return tenant.reload(ctx, tx, version, searchSchemasSnapshot)
return tenant.reload(ctx, tx, version, searchSchemasSnapshot, txMgr)
}

func (tenant *Tenant) shouldReload(currentVersion Version) bool {
Expand All @@ -678,7 +672,7 @@ func (tenant *Tenant) shouldReload(currentVersion Version) bool {
// loads all the databases, it loads the resources for each one. Once databases are reloaded then it performs the same
// logic for search indexes. Once search indexes are loaded it links back the search indexes to the Tigris Collection
// if the source for these search indexes is Tigris.
func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVersion Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) error {
func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVersion Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse, txMgr *transaction.Manager) error {
// reset
tenant.projects = make(map[string]*Project)
tenant.idToDatabaseMap = make(map[uint32]*Database)
Expand Down Expand Up @@ -714,11 +708,14 @@ func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVers
tenant.idToDatabaseMap[meta.ID] = database
}

err = tx.Commit(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to reload tenant")
}
// load search indexes, this is essentially loading all the search indexes created by the user and attaching it to
// the project object.
for _, p := range tenant.projects {
var err error
if p.search, err = tenant.reloadSearch(ctx, tx, p, searchSchemasSnapshot); err != nil {
if p.search, err = tenant.reloadSearch(ctx, txMgr, p, searchSchemasSnapshot); err != nil {
return err
}
for _, index := range p.search.indexes {
Expand Down Expand Up @@ -805,16 +802,32 @@ func (tenant *Tenant) reloadDatabase(ctx context.Context, tx transaction.Tx, dbN
}

// reloadSearch is responsible for reloading all the search indexes inside a single project.
func (tenant *Tenant) reloadSearch(ctx context.Context, tx transaction.Tx, project *Project, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) (*Search, error) {
projMetadata, err := tenant.namespaceStore.GetProjectMetadata(ctx, tx, tenant.namespace.Id(), project.Name())
func (tenant *Tenant) reloadSearch(ctx context.Context, txMgr *transaction.Manager, project *Project, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) (*Search, error) {
txToReadProjectMetadata, err := txMgr.StartTx(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to start tx to read project metadata while reloading tenant")
}

projMetadata, err := tenant.namespaceStore.GetProjectMetadata(ctx, txToReadProjectMetadata, tenant.namespace.Id(), project.Name())
if err != nil {
return nil, errors.Internal("failed to get project metadata for project %s", project.Name())
}
_ = txToReadProjectMetadata.Commit(ctx)

searchObj := NewSearch()

for _, searchMD := range projMetadata.SearchMetadata {
schV, err := tenant.searchSchemaStore.GetLatest(ctx, tx, tenant.namespace.Id(), project.id, searchMD.Name)
var indexLevelTx transaction.Tx
for i, searchMD := range projMetadata.SearchMetadata {
if i%10 == 0 {
if indexLevelTx != nil {
_ = indexLevelTx.Commit(ctx)
}
indexLevelTx, err = txMgr.StartTx(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to start tx to reload indices in batch for tenant reload")
}
}
schV, err := tenant.searchSchemaStore.GetLatest(ctx, indexLevelTx, tenant.namespace.Id(), project.id, searchMD.Name)
if err != nil {
return nil, err
}
Expand All @@ -833,6 +846,9 @@ func (tenant *Tenant) reloadSearch(ctx context.Context, tx transaction.Tx, proje
searchObj.indexes[searchMD.Name] = schema.NewSearchIndex(schV.Version, searchStoreIndexName, searchFactory, fieldsInSearchStore)
}

if indexLevelTx != nil {
_ = indexLevelTx.Commit(ctx)
}
return searchObj, nil
}

Expand Down
30 changes: 15 additions & 15 deletions server/metadata/tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestTenantManager_CreateProjects(t *testing.T) {

tx, err = tm.StartTx(ctx)
require.NoError(t, err)
err = tenant.reload(ctx, tx, nil, nil)
err = tenant.reload(ctx, tx, nil, nil, tm)
require.NoError(t, err)
proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
databases := (&Project{}).GetDatabaseWithBranches()
require.Len(t, databases, 0)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch1")))
require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj2, NewDatabaseNameWithBranch(tenantProj2, "branch1")))
Expand All @@ -282,7 +282,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
require.ErrorContains(t, tenant.CreateBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist")

// reload again to get all the branches
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

// list all branches
branches := tenant.ListDatabaseBranches(tenantProj1)
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
require.NoError(t, tenant.DeleteBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch2")))
require.ErrorContains(t, tenant.DeleteBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist")

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
require.NoError(t, tx.Commit(ctx))

tx, err = tm.StartTx(ctx)
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestTenantManager_CreateCollections(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestTenantManager_CreateCollections(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj2, err = tenant.GetProject(tenantProj2)
require.NoError(t, err)
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestTenantManager_DropCollection(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestTenantManager_DropCollection(t *testing.T) {
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
require.NoError(t, tx.Commit(ctx))

tx, err = tm.StartTx(ctx)
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestTenantManager_SearchIndexes(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj1, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestTenantManager_SearchIndexes(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, indexesInSearchStore[tenant.Encoder.EncodeSearchTableName(tenant.namespace.Id(), proj1.Id(), factory.Name)])

require.NoError(t, tenant.reload(ctx, tx, nil, indexesInSearchStore))
require.NoError(t, tenant.reload(ctx, tx, nil, indexesInSearchStore, tm))

proj1, err = tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj2, err = tenant.GetProject(tenantProj2)
require.NoError(t, err)
Expand Down Expand Up @@ -688,7 +688,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj1, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db1, factory))

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err = tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func TestTenantManager_SearchDataSize(t *testing.T) {

err = tenant.CreateProject(ctx, tmTx, tenantProj2, nil)
require.NoError(t, err)
require.NoError(t, tenant.reload(ctx, tmTx, nil, nil))
require.NoError(t, tenant.reload(ctx, tmTx, nil, nil, tm))

// proj1
proj1, err := tenant.GetProject(tenantProj1)
Expand Down
2 changes: 1 addition & 1 deletion server/metadata/tenant_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (cacheTracker *CacheTracker) stopTracking(ctx context.Context, tenant *Tena
return err
}

if err = tenant.Reload(ctx, tx, version, cacheTracker.tenantMgr.searchSchemasSnapshot); err != nil {
if err = tenant.Reload(ctx, tx, version, cacheTracker.tenantMgr.searchSchemasSnapshot, cacheTracker.txMgr); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion server/quota/quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestQuota(t *testing.T) {
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
require.NoError(t, err)

err = tenant.Reload(ctx, tx, []byte("aaa"), nil)
err = tenant.Reload(ctx, tx, []byte("aaa"), nil, txMgr)
require.NoError(t, err)

proj1, err := tenant.GetProject(projName)
Expand Down
2 changes: 1 addition & 1 deletion server/quota/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStorageQuota(t *testing.T) {
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
require.NoError(t, err)

err = tenant.Reload(ctx, tx, []byte("aaa"), nil)
err = tenant.Reload(ctx, tx, []byte("aaa"), nil, txMgr)
require.NoError(t, err)

proj1, err := tenant.GetProject(projName)
Expand Down
2 changes: 1 addition & 1 deletion server/services/v1/realtime/device_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *Sessions) CreateDeviceSession(ctx context.Context, conn *websocket.Conn
if version, err = s.versionH.Read(ctx, tx, false); err != nil {
return nil, err
}
if err = tenant.Reload(ctx, tx, version, nil); err != nil {
if err = tenant.Reload(ctx, tx, version, nil, s.txMgr); err != nil {
return nil, err
}

Expand Down

0 comments on commit 80b4eea

Please sign in to comment.