Skip to content

Commit

Permalink
Add some trace logging for metadata repository (#74)
Browse files Browse the repository at this point in the history
Signed-off-by: Igor Shishkin <me@teran.dev>
  • Loading branch information
teran authored Jul 21, 2024
1 parent c1b5cda commit c0e8f23
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 114 deletions.
27 changes: 13 additions & 14 deletions repositories/metadata/postgresql/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func (r *repository) CreateBLOB(ctx context.Context, checksum string, size uint64, mimeType string) error {
_, err := psql.
_, err := insertQuery(ctx, r.db, psql.
Insert("blobs").
Columns(
"checksum",
Expand All @@ -19,15 +19,12 @@ func (r *repository) CreateBLOB(ctx context.Context, checksum string, size uint6
checksum,
size,
mimeType,
).
RunWith(r.db).
ExecContext(ctx)

return errors.Wrap(err, "error executing SQL query")
))
return mapSQLErrors(err)
}

func (r *repository) GetBlobKeyByObject(ctx context.Context, container, version, key string) (string, error) {
row := psql.
row, err := selectQueryRow(ctx, r.db, psql.
Select("b.checksum AS checksum").
From("blobs b").
Join("objects o ON o.blob_id = b.id").
Expand All @@ -38,9 +35,10 @@ func (r *repository) GetBlobKeyByObject(ctx context.Context, container, version,
"v.name": version,
"o.key": key,
"v.is_published": true,
}).
RunWith(r.db).
QueryRowContext(ctx)
}))
if err != nil {
return "", mapSQLErrors(err)
}

var checksum string
if err := row.Scan(&checksum); err != nil {
Expand All @@ -51,15 +49,16 @@ func (r *repository) GetBlobKeyByObject(ctx context.Context, container, version,
}

func (r *repository) EnsureBlobKey(ctx context.Context, key string, size uint64) error {
row := psql.
row, err := selectQueryRow(ctx, r.db, psql.
Select("id").
From("blobs").
Where(sq.Eq{
"checksum": key,
"size": size,
}).
RunWith(r.db).
QueryRowContext(ctx)
}))
if err != nil {
return errors.Wrap(err, "error selecting BLOB")
}

var blobID uint
if err := row.Scan(&blobID); err != nil {
Expand Down
29 changes: 10 additions & 19 deletions repositories/metadata/postgresql/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,35 @@ import (
"context"

sq "github.com/Masterminds/squirrel"
"github.com/pkg/errors"
)

func (r *repository) CreateContainer(ctx context.Context, name string) error {
_, err := psql.
_, err := insertQuery(ctx, r.db, psql.
Insert("containers").
Columns(
"name",
).
Values(
name,
).
RunWith(r.db).
ExecContext(ctx)

return errors.Wrap(err, "error executing SQL query")
))
return mapSQLErrors(err)
}

func (r *repository) ListContainers(ctx context.Context) ([]string, error) {
rows, err := psql.
rows, err := selectQuery(ctx, r.db, psql.
Select("name").
From("containers").
OrderBy("name").
RunWith(r.db).
QueryContext(ctx)
OrderBy("name"))
if err != nil {
return nil, errors.Wrap(mapSQLErrors(err), "error executing SQL query")
return nil, mapSQLErrors(err)
}
defer rows.Close()

result := []string{}
for rows.Next() {
var r string
if err := rows.Scan(&r); err != nil {
return nil, errors.Wrap(err, "error decoding database result")
return nil, mapSQLErrors(err)
}

result = append(result, r)
Expand All @@ -48,11 +42,8 @@ func (r *repository) ListContainers(ctx context.Context) ([]string, error) {
}

func (r *repository) DeleteContainer(ctx context.Context, name string) error {
_, err := psql.
_, err := deleteQuery(ctx, r.db, psql.
Delete("containers").
Where(sq.Eq{"name": name}).
RunWith(r.db).
ExecContext(ctx)

return errors.Wrap(err, "error executing SQL query")
Where(sq.Eq{"name": name}))
return mapSQLErrors(err)
}
2 changes: 1 addition & 1 deletion repositories/metadata/postgresql/containers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (s *postgreSQLRepositoryTestSuite) TestContainerOperations() {
err = s.repo.CreateContainer(s.ctx, "test-container9")
s.Require().Error(err)
s.Require().Equal(
`error executing SQL query: pq: duplicate key value violates unique constraint "containers_name_key"`,
`pq: duplicate key value violates unique constraint "containers_name_key"`,
err.Error(),
)

Expand Down
99 changes: 49 additions & 50 deletions repositories/metadata/postgresql/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,38 @@ func (r *repository) CreateObject(ctx context.Context, container, version, key,
}
defer tx.Rollback()

row := psql.
row, err := selectQueryRow(ctx, tx, psql.
Select("v.id as id").
From("containers c").
Join("versions v ON v.container_id = c.id").
Where(sq.Eq{
"c.name": container,
"v.name": version,
"is_published": false,
}).
RunWith(tx).
QueryRowContext(ctx)
}))
if err != nil {
return mapSQLErrors(err)
}

var versionID uint
if err := row.Scan(&versionID); err != nil {
return errors.Wrap(mapSQLErrors(err), "error looking up version")
return mapSQLErrors(err)
}

row = psql.
row, err = selectQueryRow(ctx, tx, psql.
Select("id").
From("blobs").
Where(sq.Eq{"checksum": casKey}).
RunWith(tx).
QueryRowContext(ctx)
Where(sq.Eq{"checksum": casKey}))
if err != nil {
return mapSQLErrors(err)
}

var blobID uint
if err := row.Scan(&blobID); err != nil {
return errors.Wrap(mapSQLErrors(err), "error looking up blob")
return mapSQLErrors(err)
}

_, err = psql.
_, err = insertQuery(ctx, tx, psql.
Insert("objects").
Columns(
"version_id",
Expand All @@ -54,11 +56,9 @@ func (r *repository) CreateObject(ctx context.Context, container, version, key,
versionID,
key,
blobID,
).
RunWith(tx).
ExecContext(ctx)
))
if err != nil {
return errors.Wrap(err, "error executing SQL query")
return mapSQLErrors(err)
}

if err := tx.Commit(); err != nil {
Expand All @@ -68,48 +68,48 @@ func (r *repository) CreateObject(ctx context.Context, container, version, key,
}

func (r *repository) ListObjects(ctx context.Context, container, version string, offset, limit uint64) ([]string, error) {
row := psql.
row, err := selectQueryRow(ctx, r.db, psql.
Select("id").
From("containers").
Where(sq.Eq{
"name": container,
}).
RunWith(r.db).
QueryRowContext(ctx)
}))
if err != nil {
return nil, mapSQLErrors(err)
}

var containerID uint
if err := row.Scan(&containerID); err != nil {
return nil, mapSQLErrors(err)
}

row = psql.
row, err = selectQueryRow(ctx, r.db, psql.
Select("id").
From("versions").
Where(sq.Eq{
"container_id": containerID,
"name": version,
}).
RunWith(r.db).
QueryRowContext(ctx)
}))
if err != nil {
return nil, mapSQLErrors(err)
}

var versionID uint
if err := row.Scan(&versionID); err != nil {
return nil, mapSQLErrors(err)
}

rows, err := psql.
rows, err := selectQuery(ctx, r.db, psql.
Select("key").
From("objects").
Where(sq.Eq{
"version_id": versionID,
}).
OrderBy("id").
Offset(offset).
Limit(limit).
RunWith(r.db).
QueryContext(ctx)
Limit(limit))
if err != nil {
return nil, errors.Wrap(err, "error executing SQL query")
return nil, mapSQLErrors(err)
}
defer rows.Close()

Expand All @@ -133,32 +133,31 @@ func (r *repository) DeleteObject(ctx context.Context, container, version, key s
}
defer tx.Rollback()

row := psql.
row, err := selectQueryRow(ctx, tx, psql.
Select("v.id").
From("versions v").
Join("containers c ON v.container_id = c.id").
Where(sq.Eq{
"c.name": container,
"v.name": version,
}).
RunWith(tx).
QueryRowContext(ctx)
}))
if err != nil {
return mapSQLErrors(err)
}

var versionID uint
if err := row.Scan(&versionID); err != nil {
return errors.Wrap(err, "error looking up version")
}

_, err = psql.
_, err = deleteQuery(ctx, tx, psql.
Delete("objects").
Where(sq.Eq{
"version_id": versionID,
"key": key,
}).
RunWith(tx).
ExecContext(ctx)
}))
if err != nil {
return errors.Wrap(err, "error executing SQL query")
return mapSQLErrors(err)
}

if err := tx.Commit(); err != nil {
Expand All @@ -174,45 +173,45 @@ func (r *repository) RemapObject(ctx context.Context, container, version, key, n
}
defer tx.Rollback()

row := psql.
row, err := selectQueryRow(ctx, tx, psql.
Select("v.id").
From("versions v").
Join("containers c ON v.container_id = c.id").
Where(sq.Eq{
"c.name": container,
"v.name": version,
}).
RunWith(tx).
QueryRowContext(ctx)
}))
if err != nil {
return mapSQLErrors(err)
}

var versionID uint
if err := row.Scan(&versionID); err != nil {
return errors.Wrap(err, "error looking up version")
}

row = psql.
row, err = selectQueryRow(ctx, tx, psql.
Select("id").
From("blobs").
Where(sq.Eq{"checksum": newCASKey}).
RunWith(tx).
QueryRowContext(ctx)
Where(sq.Eq{"checksum": newCASKey}))
if err != nil {
return mapSQLErrors(err)
}

var blobID uint
if err := row.Scan(&blobID); err != nil {
return errors.Wrap(err, "error looking up blob")
}

_, err = psql.
_, err = updateQuery(ctx, tx, psql.
Update("objects").
Set("blob_id", blobID).
Where(sq.Eq{
"version_id": versionID,
"key": key,
}).
RunWith(tx).
ExecContext(ctx)
}))
if err != nil {
return errors.Wrap(err, "error executing SQL query")
return mapSQLErrors(err)
}

if err := tx.Commit(); err != nil {
Expand Down
Loading

0 comments on commit c0e8f23

Please sign in to comment.