From a8b85c0fc4ae55ee141ac47110b4381f691f0782 Mon Sep 17 00:00:00 2001 From: Igor Shishkin Date: Sun, 8 Dec 2024 06:32:22 +0300 Subject: [PATCH] Perform expired versions cleanup in a single transaction (#275) Closes #276 --------- Signed-off-by: Igor Shishkin --- cmd/gc/main.go | 3 - gc/service/config.go | 4 - gc/service/config_test.go | 5 +- gc/service/service.go | 208 +----------------- gc/service/service_test.go | 52 +---- .../cache/metadata/memcache/memcache.go | 4 + .../cache/metadata/memcache/memcache_test.go | 15 ++ repositories/metadata/metadata.go | 1 + repositories/metadata/mock/mock.go | 5 + .../sql/0008_add_key_id_index.down.sql | 5 + .../sql/0008_add_key_id_index.up.sql | 5 + .../metadata/postgresql/postgresql_test.go | 5 + repositories/metadata/postgresql/versions.go | 168 +++++++++++++- .../metadata/postgresql/versions_test.go | 98 +++++++++ 14 files changed, 313 insertions(+), 265 deletions(-) create mode 100644 repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.down.sql create mode 100644 repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.up.sql diff --git a/cmd/gc/main.go b/cmd/gc/main.go index 4540d12..de0fad9 100644 --- a/cmd/gc/main.go +++ b/cmd/gc/main.go @@ -24,7 +24,6 @@ type config struct { MetadataDSN string `envconfig:"METADATA_DSN" required:"true"` - DryRun bool `envconfig:"DRY_RUN" default:"true"` UnpublishedVersionMaxAge time.Duration `envconfig:"UNPUBLISHED_VERSION_MAX_AGE" default:"168h"` } @@ -53,9 +52,7 @@ func main() { svc, err := service.New(&service.Config{ MdRepo: postgresqlRepo, - DryRun: cfg.DryRun, UnpublishedVersionMaxAge: cfg.UnpublishedVersionMaxAge, - TimeNowFunc: time.Now, }) if err != nil { panic(err) diff --git a/gc/service/config.go b/gc/service/config.go index def6638..1ddc46f 100644 --- a/gc/service/config.go +++ b/gc/service/config.go @@ -10,16 +10,12 @@ import ( type Config struct { MdRepo metadata.Repository - DryRun bool UnpublishedVersionMaxAge time.Duration - TimeNowFunc func() time.Time } func (c Config) Validate() error { return validation.ValidateStruct(&c, validation.Field(&c.MdRepo, validation.Required), - validation.Field(&c.DryRun), validation.Field(&c.UnpublishedVersionMaxAge, validation.Required, validation.Min(time.Hour)), - validation.Field(&c.TimeNowFunc, validation.Required), ) } diff --git a/gc/service/config_test.go b/gc/service/config_test.go index cb45e97..36227dd 100644 --- a/gc/service/config_test.go +++ b/gc/service/config_test.go @@ -6,7 +6,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/require" - gtm "github.com/teran/go-time" mockRepo "github.com/teran/archived/repositories/metadata/mock" ) @@ -23,16 +22,14 @@ func TestConfigValidate(t *testing.T) { name: "valid config", in: &Config{ MdRepo: mockRepo.New(), - DryRun: false, UnpublishedVersionMaxAge: 10 * time.Hour, - TimeNowFunc: gtm.NewTimeNowMock().Now, }, }, { name: "empty config", in: &Config{}, expOut: errors.New( - "MdRepo: cannot be blank; TimeNowFunc: cannot be blank; UnpublishedVersionMaxAge: cannot be blank.", + "MdRepo: cannot be blank; UnpublishedVersionMaxAge: cannot be blank.", ), }, } diff --git a/gc/service/service.go b/gc/service/service.go index a0c58b9..8147198 100644 --- a/gc/service/service.go +++ b/gc/service/service.go @@ -2,15 +2,11 @@ package service import ( "context" - "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/teran/archived/models" ) -const defaultLimit uint64 = 1000 - type Service interface { Run(ctx context.Context) error } @@ -34,209 +30,17 @@ func New(cfg *Config) (Service, error) { func (s *service) Run(ctx context.Context) error { log.Info("running garbage collection ...") - log.Trace("listing namespaces ...") - namespaces, err := s.cfg.MdRepo.ListNamespaces(ctx) - if err != nil { - return errors.Wrap(err, "error listing namespaces") - } - - log.Infof("found %d namespaces", len(namespaces)) - - for _, namespace := range namespaces { - log.WithFields(log.Fields{ - "namespace": namespace, - }).Debug("processing namespace ...") - - containers, err := s.cfg.MdRepo.ListContainers(ctx, namespace) - if err != nil { - return errors.Wrap(err, "error listing containers") - } - - log.Infof("found %d containers", len(containers)) - - now := s.cfg.TimeNowFunc().UTC() - - for _, container := range containers { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container.Name, - "ttl_seconds": int64(container.VersionsTTL.Seconds()), - }).Debug("processing container ...") - - if container.VersionsTTL < 0 { - log.Debug("container TTL is set to infinite (< 0): handling unpublished versions ...") - - err = s.deleteExpiredUnpublishedVersions(ctx, now, namespace, container) - if err != nil { - return errors.Wrapf(err, "error deleting expired unpublished versions for container `%s/%s`", namespace, container.Name) - } - } else { - log.Debug("container TTL is set to positive value: handling all versions ...") - - err = s.deleteExpiredVersions(ctx, now, namespace, container) - if err != nil { - return errors.Wrapf(err, "error deleting expired versions for container `%s/%s`", namespace, container.Name) - } - } - } - } - return nil -} - -func (s *service) deleteExpiredVersions(ctx context.Context, now time.Time, namespace string, container models.Container) error { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - }).Debug("listing expired versions ...") - - versions, err := s.cfg.MdRepo.ListAllVersionsByContainer(ctx, namespace, container.Name) - if err != nil { - return errors.Wrapf(err, "error listing versions for container `%s/%s`", namespace, container) - } - - for _, version := range versions { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "is_published": version.IsPublished, - "created_at": version.CreatedAt.Format(time.RFC3339), - }).Trace("handling version ...") - - if version.CreatedAt.After(now.Add(-1 * container.VersionsTTL)) { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "is_published": version.IsPublished, - "created_at": version.CreatedAt.Format(time.RFC3339), - }).Trace("version is within TTL. Skipping ...") - continue - } - - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "is_published": version.IsPublished, - "created_at": version.CreatedAt.Format(time.RFC3339), - }).Debug("version is older then TTL. Deleting ...") - - err = s.deleteVersion(ctx, namespace, container, version) - if err != nil { - return errors.Wrapf(err, "error deleting version `%s` for container `%s/%s`", version.Name, namespace, container.Name) - } + log.Debug("Running expired versions collection ...") + if err := s.deleteExpiredVersions(ctx); err != nil { + return errors.Wrap(err, "error deleting expired versions") } return nil } -func (s *service) deleteExpiredUnpublishedVersions(ctx context.Context, now time.Time, namespace string, container models.Container) error { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - }).Debug("listing expired unpublished versions ...") - - versions, err := s.cfg.MdRepo.ListUnpublishedVersionsByContainer(ctx, namespace, container.Name) - if err != nil { - return errors.Wrapf(err, "error listing versions for container `%s/%s`", namespace, container) - } - - for _, version := range versions { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "is_published": version.IsPublished, - "created_at": version.CreatedAt.Format(time.RFC3339), - }).Trace("handling version ...") - - if version.CreatedAt.After(now.Add(-1 * s.cfg.UnpublishedVersionMaxAge)) { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "is_published": version.IsPublished, - "created_at": version.CreatedAt.Format(time.RFC3339), - }).Trace("unpublished version is within TTL. Skipping ...") - continue - } - - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "is_published": version.IsPublished, - "created_at": version.CreatedAt.Format(time.RFC3339), - }).Debug("unpublished version is older then TTL. Deleting ...") - - err = s.deleteVersion(ctx, namespace, container, version) - if err != nil { - return errors.Wrapf(err, "error deleting version `%s` for container `%s/%s`", version.Name, namespace, container.Name) - } - } - - return nil -} - -func (s *service) deleteVersion(ctx context.Context, namespace string, container models.Container, version models.Version) error { - var ( - total uint64 - offset uint64 - err error - ) - - for { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version, - "offset": offset, - "limit": defaultLimit, - }).Tracef("list objects loop iteration ...") - - var objects []string - total, objects, err = s.cfg.MdRepo.ListObjects(ctx, namespace, container.Name, version.Name, offset, defaultLimit) - if err != nil { - return errors.Wrapf(err, "error listing objects for container `%s/%s`; version `%s`", namespace, container, version.Name) - } - - if total == 0 { - break - } - - if !s.cfg.DryRun { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - "amount": len(objects), - }).Info("Performing actual metadata deletion: objects") - - err = s.cfg.MdRepo.DeleteObject(ctx, namespace, container.Name, version.Name, objects...) - if err != nil { - return errors.Wrapf(err, "error removing object from `%s/%s/%s (%d objects)`", namespace, container, version.Name, len(objects)) - } - } - } - - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - }).Debug("deleting version ...") - - if !s.cfg.DryRun { - log.WithFields(log.Fields{ - "namespace": namespace, - "container": container, - "version": version.Name, - }).Info("Performing actual metadata deletion: version") - - err = s.cfg.MdRepo.DeleteVersion(ctx, namespace, container.Name, version.Name) - if err != nil { - return err - } +func (s *service) deleteExpiredVersions(ctx context.Context) error { + if err := s.cfg.MdRepo.DeleteExpiredVersionsWithObjects(ctx, s.cfg.UnpublishedVersionMaxAge); err != nil { + return errors.Wrap(err, "error calling repository") } return nil diff --git a/gc/service/service_test.go b/gc/service/service_test.go index 00ed1a6..6da1ccb 100644 --- a/gc/service/service_test.go +++ b/gc/service/service_test.go @@ -9,63 +9,15 @@ import ( "github.com/stretchr/testify/suite" gtm "github.com/teran/go-time" - "github.com/teran/archived/models" repoMock "github.com/teran/archived/repositories/metadata/mock" ) -const defaultNamespace = "default" - func init() { log.SetLevel(log.TraceLevel) } func (s *serviceTestSuite) TestDeleteUnpublishedExpiredVersions() { - s.tp.On("Now").Return("2024-08-01T10:11:12Z").Once() - - call0 := s.repoMock.On("ListNamespaces").Return([]string{defaultNamespace}, nil).Once() - - call1 := s.repoMock.On("ListContainers", defaultNamespace).Return([]models.Container{{Name: "container1", VersionsTTL: -1 * time.Second}}, nil).Once().NotBefore(call0) - - call2 := s.repoMock.On("ListUnpublishedVersionsByContainer", defaultNamespace, "container1").Return([]models.Version{ - { - Name: "version1", - CreatedAt: time.Date(2024, 7, 31, 10, 1, 1, 0, time.UTC), - }, - { - Name: "version2", - CreatedAt: time.Date(2024, 8, 1, 10, 1, 1, 0, time.UTC), - }, - }, nil).Once().NotBefore(call1) - call3 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(3), []string{"obj1", "obj2", "obj3"}, nil).Once().NotBefore(call2) - call4 := s.repoMock.On("DeleteObject", defaultNamespace, "container1", "version1", []string{"obj1", "obj2", "obj3"}).Return(nil).Once().NotBefore(call3) - call5 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(0), []string{}, nil).Once().NotBefore(call4) - _ = s.repoMock.On("DeleteVersion", defaultNamespace, "container1", "version1").Return(nil).Once().NotBefore(call5) - - err := s.svc.Run(s.ctx) - s.Require().NoError(err) -} - -func (s *serviceTestSuite) TestDeleteExpiredVersions() { - s.tp.On("Now").Return("2024-08-01T10:11:12Z").Once() - - call0 := s.repoMock.On("ListNamespaces").Return([]string{defaultNamespace}, nil).Once() - - call1 := s.repoMock.On("ListContainers", defaultNamespace).Return([]models.Container{{Name: "container1", VersionsTTL: 1 * time.Hour}}, nil).Once().NotBefore(call0) - - call2 := s.repoMock.On("ListAllVersionsByContainer", defaultNamespace, "container1").Return([]models.Version{ - { - Name: "version1", - CreatedAt: time.Date(2024, 7, 31, 10, 1, 1, 0, time.UTC), - }, - { - Name: "version2", - CreatedAt: time.Date(2024, 8, 1, 10, 1, 1, 0, time.UTC), - }, - }, nil).Once().NotBefore(call1) - call3 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(3), []string{"obj1", "obj2", "obj3"}, nil).Once().NotBefore(call2) - call4 := s.repoMock.On("DeleteObject", defaultNamespace, "container1", "version1", []string{"obj1", "obj2", "obj3"}).Return(nil).Once().NotBefore(call3) - call5 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(0), []string{}, nil).Once().NotBefore(call4) - _ = s.repoMock.On("DeleteVersion", defaultNamespace, "container1", "version1").Return(nil).Once().NotBefore(call5) + s.repoMock.On("DeleteExpiredVersionsWithObjects", 10*time.Hour).Return(nil).Once() err := s.svc.Run(s.ctx) s.Require().NoError(err) @@ -91,9 +43,7 @@ func (s *serviceTestSuite) SetupTest() { var err error s.svc, err = New(&Config{ MdRepo: s.repoMock, - DryRun: false, UnpublishedVersionMaxAge: 10 * time.Hour, - TimeNowFunc: s.tp.Now, }) s.Require().NoError(err) } diff --git a/repositories/cache/metadata/memcache/memcache.go b/repositories/cache/metadata/memcache/memcache.go index 3fbb698..25ed4a1 100644 --- a/repositories/cache/metadata/memcache/memcache.go +++ b/repositories/cache/metadata/memcache/memcache.go @@ -386,6 +386,10 @@ func (m *memcache) DeleteVersion(ctx context.Context, namespace, container, vers return m.repo.DeleteVersion(ctx, namespace, container, version) } +func (m *memcache) DeleteExpiredVersionsWithObjects(ctx context.Context, unpublishedVersionsMaxAge time.Duration) error { + return m.repo.DeleteExpiredVersionsWithObjects(ctx, unpublishedVersionsMaxAge) +} + func (m *memcache) CreateObject(ctx context.Context, namespace, container, version, key, casKey string) error { return m.repo.CreateObject(ctx, namespace, container, version, key, casKey) } diff --git a/repositories/cache/metadata/memcache/memcache_test.go b/repositories/cache/metadata/memcache/memcache_test.go index 47513a4..ed4742a 100644 --- a/repositories/cache/metadata/memcache/memcache_test.go +++ b/repositories/cache/metadata/memcache/memcache_test.go @@ -333,6 +333,21 @@ func (s *memcacheTestSuite) TestDeleteVersion() { s.Require().NoError(err) } +func (s *memcacheTestSuite) DeleteExpiredVersionsWithObjects() { + s.repoMock.On("DeleteExpiredVersionsWithObjects", 30*time.Second).Return(nil).Once() + s.repoMock.On("DeleteExpiredVersionsWithObjects", 40*time.Second).Return(nil).Once() + s.repoMock.On("DeleteExpiredVersionsWithObjects", 60*time.Second).Return(nil).Once() + + err := s.cache.DeleteExpiredVersionsWithObjects(s.ctx, 30*time.Second) + s.Require().NoError(err) + + err = s.cache.DeleteExpiredVersionsWithObjects(s.ctx, 40*time.Second) + s.Require().NoError(err) + + err = s.cache.DeleteExpiredVersionsWithObjects(s.ctx, 60*time.Second) + s.Require().NoError(err) +} + func (s *memcacheTestSuite) TestCreateObject() { s.repoMock.On("CreateObject", defaultNamespace, "test-container", "test-version", "test-key", "deadbeef").Return(nil).Twice() diff --git a/repositories/metadata/metadata.go b/repositories/metadata/metadata.go index 384b5b9..75efed3 100644 --- a/repositories/metadata/metadata.go +++ b/repositories/metadata/metadata.go @@ -36,6 +36,7 @@ type Repository interface { ListUnpublishedVersionsByContainer(ctx context.Context, namespace, container string) ([]models.Version, error) MarkVersionPublished(ctx context.Context, namespace, container, version string) error DeleteVersion(ctx context.Context, namespace, container, version string) error + DeleteExpiredVersionsWithObjects(ctx context.Context, unpublishedVersionsMaxAge time.Duration) error CreateObject(ctx context.Context, namespace, container, version, key, casKey string) error ListObjects(ctx context.Context, namespace, container, version string, offset, limit uint64) (uint64, []string, error) diff --git a/repositories/metadata/mock/mock.go b/repositories/metadata/mock/mock.go index 39c8cad..5dd6c30 100644 --- a/repositories/metadata/mock/mock.go +++ b/repositories/metadata/mock/mock.go @@ -111,6 +111,11 @@ func (m *Mock) DeleteVersion(ctx context.Context, namespace, container, version return args.Error(0) } +func (m *Mock) DeleteExpiredVersionsWithObjects(ctx context.Context, unpublishedVersionsMaxAge time.Duration) error { + args := m.Called(unpublishedVersionsMaxAge) + return args.Error(0) +} + func (m *Mock) CreateObject(_ context.Context, namespace, container, version, key, casKey string) error { args := m.Called(namespace, container, version, key, casKey) return args.Error(0) diff --git a/repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.down.sql b/repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.down.sql new file mode 100644 index 0000000..95f5922 --- /dev/null +++ b/repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.down.sql @@ -0,0 +1,5 @@ +BEGIN; + +DROP INDEX objects_key_id_idx ; + +COMMIT; diff --git a/repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.up.sql b/repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.up.sql new file mode 100644 index 0000000..f75b396 --- /dev/null +++ b/repositories/metadata/postgresql/migrations/sql/0008_add_key_id_index.up.sql @@ -0,0 +1,5 @@ +BEGIN; + +CREATE INDEX objects_key_id_idx ON objects (key_id); + +COMMIT; diff --git a/repositories/metadata/postgresql/postgresql_test.go b/repositories/metadata/postgresql/postgresql_test.go index 8ccbc13..a619ac4 100644 --- a/repositories/metadata/postgresql/postgresql_test.go +++ b/repositories/metadata/postgresql/postgresql_test.go @@ -6,6 +6,7 @@ import ( "testing" _ "github.com/lib/pq" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" postgresApp "github.com/teran/go-docker-testsuite/applications/postgres" gtm "github.com/teran/go-time" @@ -14,6 +15,10 @@ import ( "github.com/teran/archived/repositories/metadata/postgresql/migrations" ) +func init() { + log.SetLevel(log.TraceLevel) +} + // Definitions ... type postgreSQLRepositoryTestSuite struct { suite.Suite diff --git a/repositories/metadata/postgresql/versions.go b/repositories/metadata/postgresql/versions.go index fedc402..9f83ae7 100644 --- a/repositories/metadata/postgresql/versions.go +++ b/repositories/metadata/postgresql/versions.go @@ -2,6 +2,7 @@ package postgresql import ( "context" + "errors" "time" sq "github.com/Masterminds/squirrel" @@ -12,7 +13,10 @@ import ( "github.com/teran/archived/repositories/metadata" ) -const defaultLimit uint64 = 1000 +const ( + defaultLimit uint64 = 1000 + expiredVersionsBatchSize int = 1000 +) func (r *repository) CreateVersion(ctx context.Context, namespace, container string) (string, error) { tx, err := r.db.BeginTx(ctx, nil) @@ -361,3 +365,165 @@ func (r *repository) DeleteVersion(ctx context.Context, namespace, container, ve } return nil } + +func (r *repository) DeleteExpiredVersionsWithObjects(ctx context.Context, unpublishedVersionsMaxAge time.Duration) error { + tx, err := r.db.BeginTx(ctx, nil) + if err != nil { + return mapSQLErrors(err) + } + defer func() { + err := tx.Rollback() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Error("error rolling back") + } + }() + + now := r.tp().UTC() + + q := psql. + Select( + "v.id AS version_id", + ). + From("containers c"). + Join("versions v ON v.container_id = c.id"). + Join("namespaces n ON n.id = c.namespace_id"). + Where( + sq.Or{ + sq.And{ + sq.Gt{ + "c.version_ttl_seconds": 0, + }, + sq.Expr("v.created_at <= (?::timestamp - c.version_ttl_seconds * interval '1 second')", now.Format(time.RFC3339)), + }, + sq.And{ + sq.Eq{ + "v.is_published": false, + }, + sq.Expr("v.created_at <= ?::timestamp", now.Add(-1*unpublishedVersionsMaxAge).Format(time.RFC3339)), + }, + }, + ) + + rows, err := selectQuery(ctx, tx, q) + if err != nil { + return mapSQLErrors(err) + } + defer rows.Close() + + deleteCandidates := []uint64{} + for rows.Next() { + var versionID uint64 + if err := rows.Scan(&versionID); err != nil { + return mapSQLErrors(err) + } + + deleteCandidates = append(deleteCandidates, versionID) + } + + if err := rows.Err(); err != nil { + return mapSQLErrors(err) + } + + // + // lib/pq (and probably PostgreSQL itself) has a limit of 65k arguments so let's batch 'em + // + if err := indexChunks(len(deleteCandidates), expiredVersionsBatchSize, func(start, end int) error { + if _, err := deleteQuery(ctx, tx, psql. + Delete("objects"). + Where(sq.Eq{ + "version_id": deleteCandidates[start:end], + }), + ); err != nil { + return err + } + return nil + }); err != nil { + return mapSQLErrors(err) + } + + if err := indexChunks(len(deleteCandidates), expiredVersionsBatchSize, func(start, end int) error { + if _, err := deleteQuery(ctx, tx, psql. + Delete("versions"). + Where(sq.Eq{ + "id": deleteCandidates[start:end], + }), + ); err != nil { + return err + } + return nil + }); err != nil { + return mapSQLErrors(err) + } + + orphanedObjectKeyIDs := []uint64{} + rows, err = selectQuery(ctx, tx, psql. + Select( + "ok.id AS id", + ). + From("object_keys ok"). + LeftJoin("objects o ON o.key_id = ok.id"). + Where(sq.Eq{ + "o.key_id": nil, + }), + ) + if err != nil { + return mapSQLErrors(err) + } + defer rows.Close() + + for rows.Next() { + var keyID uint64 + if err := rows.Scan(&keyID); err != nil { + return mapSQLErrors(err) + } + + orphanedObjectKeyIDs = append(orphanedObjectKeyIDs, keyID) + } + + if err := indexChunks(len(orphanedObjectKeyIDs), expiredVersionsBatchSize, func(start, end int) error { + if _, err := deleteQuery(ctx, tx, psql. + Delete("object_keys"). + Where(sq.Eq{ + "id": orphanedObjectKeyIDs[start:end], + }), + ); err != nil { + return err + } + return nil + }); err != nil { + return mapSQLErrors(err) + } + + if err := rows.Err(); err != nil { + return mapSQLErrors(err) + } + + if err := tx.Commit(); err != nil { + return mapSQLErrors(err) + } + return nil +} + +func indexChunks(length, chuckLen int, fn func(start, end int) error) error { + if chuckLen <= 0 { + return errors.ErrUnsupported + } + + for i := 0; i < length; i += chuckLen { + l := minInt(i+chuckLen, length) + err := fn(i, l) + if err != nil { + return err + } + } + return nil +} + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/repositories/metadata/postgresql/versions_test.go b/repositories/metadata/postgresql/versions_test.go index 5635fd5..96d55b0 100644 --- a/repositories/metadata/postgresql/versions_test.go +++ b/repositories/metadata/postgresql/versions_test.go @@ -324,3 +324,101 @@ func (s *postgreSQLRepositoryTestSuite) TestGetLatestPublishedVersionByContainer s.Require().Error(err) s.Require().Equal("not found", err.Error()) } + +func (s *postgreSQLRepositoryTestSuite) TestDeleteExpiredVersionsWithObjects() { + // CreateContainer (created_at) + s.tp.On("Now").Return("2024-07-07T10:11:12Z").Once() + + // CreateVersion (created_at) + s.tp.On("Now").Return("2024-07-07T10:11:13Z").Once() + + // CreateVersion (created_at) + s.tp.On("Now").Return("2024-07-07T10:11:14Z").Once() + + // CreateVersion (created_at) + s.tp.On("Now").Return("2024-10-07T10:11:15Z").Once() + + // CreateVersion (created_at) + s.tp.On("Now").Return("2024-10-07T10:11:16Z").Once() + + // CreateBLOB (created_at) + s.tp.On("Now").Return("2024-07-07T10:11:13Z").Once() + + // CreateObject twice (created_at) + s.tp.On("Now").Return("2024-07-07T10:11:13Z").Times(4) + + // DeleteExpiredVersionsWithObjects (now()) + s.tp.On("Now").Return("2024-10-08T11:11:11Z").Once() + + err := s.repo.CreateContainer(s.ctx, defaultNamespace, "test-container", 48*time.Hour) + s.Require().NoError(err) + + // Create some versions + version1, err := s.repo.CreateVersion(s.ctx, defaultNamespace, "test-container") + s.Require().NoError(err) + + version2, err := s.repo.CreateVersion(s.ctx, defaultNamespace, "test-container") + s.Require().NoError(err) + + version3, err := s.repo.CreateVersion(s.ctx, defaultNamespace, "test-container") + s.Require().NoError(err) + + version4, err := s.repo.CreateVersion(s.ctx, defaultNamespace, "test-container") + s.Require().NoError(err) + + err = s.repo.CreateBLOB(s.ctx, "deadbeef", 10, "text/plain") + s.Require().NoError(err) + for _, v := range []string{version1, version2, version3, version4} { + err = s.repo.CreateObject(s.ctx, defaultNamespace, "test-container", v, "some-key1.txt", "deadbeef") + s.Require().NoError(err) + } + + err = s.repo.MarkVersionPublished(s.ctx, defaultNamespace, "test-container", version1) + s.Require().NoError(err) + + err = s.repo.MarkVersionPublished(s.ctx, defaultNamespace, "test-container", version3) + s.Require().NoError(err) + + versions, err := s.repo.ListAllVersionsByContainer(s.ctx, defaultNamespace, "test-container") + s.Require().NoError(err) + s.Require().Equal([]models.Version{ + { + Name: "20241007101116", + IsPublished: false, + CreatedAt: time.Date(2024, 10, 7, 10, 11, 16, 0, time.UTC), + }, + { + Name: "20241007101115", + IsPublished: true, + CreatedAt: time.Date(2024, 10, 7, 10, 11, 15, 0, time.UTC), + }, + { + Name: "20240707101114", + IsPublished: false, + CreatedAt: time.Date(2024, 7, 7, 10, 11, 14, 0, time.UTC), + }, + { + Name: "20240707101113", + IsPublished: true, + CreatedAt: time.Date(2024, 7, 7, 10, 11, 13, 0, time.UTC), + }, + }, versions) + + err = s.repo.DeleteExpiredVersionsWithObjects(s.ctx, 24*7*time.Hour) + s.Require().NoError(err) + + versions, err = s.repo.ListAllVersionsByContainer(s.ctx, defaultNamespace, "test-container") + s.Require().NoError(err) + s.Require().Equal([]models.Version{ + { + Name: "20241007101116", + IsPublished: false, + CreatedAt: time.Date(2024, 10, 7, 10, 11, 16, 0, time.UTC), + }, + { + Name: "20241007101115", + IsPublished: true, + CreatedAt: time.Date(2024, 10, 7, 10, 11, 15, 0, time.UTC), + }, + }, versions) +}