Skip to content

Commit

Permalink
Perform expired versions cleanup in a single transaction (#275)
Browse files Browse the repository at this point in the history
Closes #276

---------

Signed-off-by: Igor Shishkin <me@teran.ru>
  • Loading branch information
teran authored Dec 8, 2024
1 parent 03e45db commit a8b85c0
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 265 deletions.
3 changes: 0 additions & 3 deletions cmd/gc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type config struct {

MetadataDSN string `envconfig:"METADATA_DSN" required:"true"`

DryRun bool `envconfig:"DRY_RUN" default:"true"`
UnpublishedVersionMaxAge time.Duration `envconfig:"UNPUBLISHED_VERSION_MAX_AGE" default:"168h"`
}

Expand Down Expand Up @@ -53,9 +52,7 @@ func main() {

svc, err := service.New(&service.Config{
MdRepo: postgresqlRepo,
DryRun: cfg.DryRun,
UnpublishedVersionMaxAge: cfg.UnpublishedVersionMaxAge,
TimeNowFunc: time.Now,
})
if err != nil {
panic(err)
Expand Down
4 changes: 0 additions & 4 deletions gc/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@ import (

type Config struct {
MdRepo metadata.Repository
DryRun bool
UnpublishedVersionMaxAge time.Duration
TimeNowFunc func() time.Time
}

func (c Config) Validate() error {
return validation.ValidateStruct(&c,
validation.Field(&c.MdRepo, validation.Required),
validation.Field(&c.DryRun),
validation.Field(&c.UnpublishedVersionMaxAge, validation.Required, validation.Min(time.Hour)),
validation.Field(&c.TimeNowFunc, validation.Required),
)
}
5 changes: 1 addition & 4 deletions gc/service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/pkg/errors"
"github.com/stretchr/testify/require"
gtm "github.com/teran/go-time"

mockRepo "github.com/teran/archived/repositories/metadata/mock"
)
Expand All @@ -23,16 +22,14 @@ func TestConfigValidate(t *testing.T) {
name: "valid config",
in: &Config{
MdRepo: mockRepo.New(),
DryRun: false,
UnpublishedVersionMaxAge: 10 * time.Hour,
TimeNowFunc: gtm.NewTimeNowMock().Now,
},
},
{
name: "empty config",
in: &Config{},
expOut: errors.New(
"MdRepo: cannot be blank; TimeNowFunc: cannot be blank; UnpublishedVersionMaxAge: cannot be blank.",
"MdRepo: cannot be blank; UnpublishedVersionMaxAge: cannot be blank.",
),
},
}
Expand Down
208 changes: 6 additions & 202 deletions gc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package service

import (
"context"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/teran/archived/models"
)

const defaultLimit uint64 = 1000

type Service interface {
Run(ctx context.Context) error
}
Expand All @@ -34,209 +30,17 @@ func New(cfg *Config) (Service, error) {
func (s *service) Run(ctx context.Context) error {
log.Info("running garbage collection ...")

log.Trace("listing namespaces ...")
namespaces, err := s.cfg.MdRepo.ListNamespaces(ctx)
if err != nil {
return errors.Wrap(err, "error listing namespaces")
}

log.Infof("found %d namespaces", len(namespaces))

for _, namespace := range namespaces {
log.WithFields(log.Fields{
"namespace": namespace,
}).Debug("processing namespace ...")

containers, err := s.cfg.MdRepo.ListContainers(ctx, namespace)
if err != nil {
return errors.Wrap(err, "error listing containers")
}

log.Infof("found %d containers", len(containers))

now := s.cfg.TimeNowFunc().UTC()

for _, container := range containers {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container.Name,
"ttl_seconds": int64(container.VersionsTTL.Seconds()),
}).Debug("processing container ...")

if container.VersionsTTL < 0 {
log.Debug("container TTL is set to infinite (< 0): handling unpublished versions ...")

err = s.deleteExpiredUnpublishedVersions(ctx, now, namespace, container)
if err != nil {
return errors.Wrapf(err, "error deleting expired unpublished versions for container `%s/%s`", namespace, container.Name)
}
} else {
log.Debug("container TTL is set to positive value: handling all versions ...")

err = s.deleteExpiredVersions(ctx, now, namespace, container)
if err != nil {
return errors.Wrapf(err, "error deleting expired versions for container `%s/%s`", namespace, container.Name)
}
}
}
}
return nil
}

func (s *service) deleteExpiredVersions(ctx context.Context, now time.Time, namespace string, container models.Container) error {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
}).Debug("listing expired versions ...")

versions, err := s.cfg.MdRepo.ListAllVersionsByContainer(ctx, namespace, container.Name)
if err != nil {
return errors.Wrapf(err, "error listing versions for container `%s/%s`", namespace, container)
}

for _, version := range versions {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"is_published": version.IsPublished,
"created_at": version.CreatedAt.Format(time.RFC3339),
}).Trace("handling version ...")

if version.CreatedAt.After(now.Add(-1 * container.VersionsTTL)) {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"is_published": version.IsPublished,
"created_at": version.CreatedAt.Format(time.RFC3339),
}).Trace("version is within TTL. Skipping ...")
continue
}

log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"is_published": version.IsPublished,
"created_at": version.CreatedAt.Format(time.RFC3339),
}).Debug("version is older then TTL. Deleting ...")

err = s.deleteVersion(ctx, namespace, container, version)
if err != nil {
return errors.Wrapf(err, "error deleting version `%s` for container `%s/%s`", version.Name, namespace, container.Name)
}
log.Debug("Running expired versions collection ...")
if err := s.deleteExpiredVersions(ctx); err != nil {
return errors.Wrap(err, "error deleting expired versions")
}

return nil
}

func (s *service) deleteExpiredUnpublishedVersions(ctx context.Context, now time.Time, namespace string, container models.Container) error {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
}).Debug("listing expired unpublished versions ...")

versions, err := s.cfg.MdRepo.ListUnpublishedVersionsByContainer(ctx, namespace, container.Name)
if err != nil {
return errors.Wrapf(err, "error listing versions for container `%s/%s`", namespace, container)
}

for _, version := range versions {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"is_published": version.IsPublished,
"created_at": version.CreatedAt.Format(time.RFC3339),
}).Trace("handling version ...")

if version.CreatedAt.After(now.Add(-1 * s.cfg.UnpublishedVersionMaxAge)) {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"is_published": version.IsPublished,
"created_at": version.CreatedAt.Format(time.RFC3339),
}).Trace("unpublished version is within TTL. Skipping ...")
continue
}

log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"is_published": version.IsPublished,
"created_at": version.CreatedAt.Format(time.RFC3339),
}).Debug("unpublished version is older then TTL. Deleting ...")

err = s.deleteVersion(ctx, namespace, container, version)
if err != nil {
return errors.Wrapf(err, "error deleting version `%s` for container `%s/%s`", version.Name, namespace, container.Name)
}
}

return nil
}

func (s *service) deleteVersion(ctx context.Context, namespace string, container models.Container, version models.Version) error {
var (
total uint64
offset uint64
err error
)

for {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version,
"offset": offset,
"limit": defaultLimit,
}).Tracef("list objects loop iteration ...")

var objects []string
total, objects, err = s.cfg.MdRepo.ListObjects(ctx, namespace, container.Name, version.Name, offset, defaultLimit)
if err != nil {
return errors.Wrapf(err, "error listing objects for container `%s/%s`; version `%s`", namespace, container, version.Name)
}

if total == 0 {
break
}

if !s.cfg.DryRun {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
"amount": len(objects),
}).Info("Performing actual metadata deletion: objects")

err = s.cfg.MdRepo.DeleteObject(ctx, namespace, container.Name, version.Name, objects...)
if err != nil {
return errors.Wrapf(err, "error removing object from `%s/%s/%s (%d objects)`", namespace, container, version.Name, len(objects))
}
}
}

log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
}).Debug("deleting version ...")

if !s.cfg.DryRun {
log.WithFields(log.Fields{
"namespace": namespace,
"container": container,
"version": version.Name,
}).Info("Performing actual metadata deletion: version")

err = s.cfg.MdRepo.DeleteVersion(ctx, namespace, container.Name, version.Name)
if err != nil {
return err
}
func (s *service) deleteExpiredVersions(ctx context.Context) error {
if err := s.cfg.MdRepo.DeleteExpiredVersionsWithObjects(ctx, s.cfg.UnpublishedVersionMaxAge); err != nil {
return errors.Wrap(err, "error calling repository")
}

return nil
Expand Down
52 changes: 1 addition & 51 deletions gc/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,15 @@ import (
"github.com/stretchr/testify/suite"
gtm "github.com/teran/go-time"

"github.com/teran/archived/models"
repoMock "github.com/teran/archived/repositories/metadata/mock"
)

const defaultNamespace = "default"

func init() {
log.SetLevel(log.TraceLevel)
}

func (s *serviceTestSuite) TestDeleteUnpublishedExpiredVersions() {
s.tp.On("Now").Return("2024-08-01T10:11:12Z").Once()

call0 := s.repoMock.On("ListNamespaces").Return([]string{defaultNamespace}, nil).Once()

call1 := s.repoMock.On("ListContainers", defaultNamespace).Return([]models.Container{{Name: "container1", VersionsTTL: -1 * time.Second}}, nil).Once().NotBefore(call0)

call2 := s.repoMock.On("ListUnpublishedVersionsByContainer", defaultNamespace, "container1").Return([]models.Version{
{
Name: "version1",
CreatedAt: time.Date(2024, 7, 31, 10, 1, 1, 0, time.UTC),
},
{
Name: "version2",
CreatedAt: time.Date(2024, 8, 1, 10, 1, 1, 0, time.UTC),
},
}, nil).Once().NotBefore(call1)
call3 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(3), []string{"obj1", "obj2", "obj3"}, nil).Once().NotBefore(call2)
call4 := s.repoMock.On("DeleteObject", defaultNamespace, "container1", "version1", []string{"obj1", "obj2", "obj3"}).Return(nil).Once().NotBefore(call3)
call5 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(0), []string{}, nil).Once().NotBefore(call4)
_ = s.repoMock.On("DeleteVersion", defaultNamespace, "container1", "version1").Return(nil).Once().NotBefore(call5)

err := s.svc.Run(s.ctx)
s.Require().NoError(err)
}

func (s *serviceTestSuite) TestDeleteExpiredVersions() {
s.tp.On("Now").Return("2024-08-01T10:11:12Z").Once()

call0 := s.repoMock.On("ListNamespaces").Return([]string{defaultNamespace}, nil).Once()

call1 := s.repoMock.On("ListContainers", defaultNamespace).Return([]models.Container{{Name: "container1", VersionsTTL: 1 * time.Hour}}, nil).Once().NotBefore(call0)

call2 := s.repoMock.On("ListAllVersionsByContainer", defaultNamespace, "container1").Return([]models.Version{
{
Name: "version1",
CreatedAt: time.Date(2024, 7, 31, 10, 1, 1, 0, time.UTC),
},
{
Name: "version2",
CreatedAt: time.Date(2024, 8, 1, 10, 1, 1, 0, time.UTC),
},
}, nil).Once().NotBefore(call1)
call3 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(3), []string{"obj1", "obj2", "obj3"}, nil).Once().NotBefore(call2)
call4 := s.repoMock.On("DeleteObject", defaultNamespace, "container1", "version1", []string{"obj1", "obj2", "obj3"}).Return(nil).Once().NotBefore(call3)
call5 := s.repoMock.On("ListObjects", defaultNamespace, "container1", "version1", uint64(0), uint64(1000)).Return(uint64(0), []string{}, nil).Once().NotBefore(call4)
_ = s.repoMock.On("DeleteVersion", defaultNamespace, "container1", "version1").Return(nil).Once().NotBefore(call5)
s.repoMock.On("DeleteExpiredVersionsWithObjects", 10*time.Hour).Return(nil).Once()

err := s.svc.Run(s.ctx)
s.Require().NoError(err)
Expand All @@ -91,9 +43,7 @@ func (s *serviceTestSuite) SetupTest() {
var err error
s.svc, err = New(&Config{
MdRepo: s.repoMock,
DryRun: false,
UnpublishedVersionMaxAge: 10 * time.Hour,
TimeNowFunc: s.tp.Now,
})
s.Require().NoError(err)
}
Expand Down
4 changes: 4 additions & 0 deletions repositories/cache/metadata/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ func (m *memcache) DeleteVersion(ctx context.Context, namespace, container, vers
return m.repo.DeleteVersion(ctx, namespace, container, version)
}

func (m *memcache) DeleteExpiredVersionsWithObjects(ctx context.Context, unpublishedVersionsMaxAge time.Duration) error {
return m.repo.DeleteExpiredVersionsWithObjects(ctx, unpublishedVersionsMaxAge)
}

func (m *memcache) CreateObject(ctx context.Context, namespace, container, version, key, casKey string) error {
return m.repo.CreateObject(ctx, namespace, container, version, key, casKey)
}
Expand Down
Loading

0 comments on commit a8b85c0

Please sign in to comment.