Skip to content

Commit

Permalink
Add full reader verification
Browse files Browse the repository at this point in the history
Before this change, the SOCI snapshotter would verify files on read.
After this change, SOCI will verify all files after the background
fetcher is done, whether their read or not.

Signed-off-by: Kern Walster <walster@amazon.com>
  • Loading branch information
Kern-- committed Jan 21, 2025
1 parent 4cfe4cc commit d0b0df2
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 9 deletions.
2 changes: 2 additions & 0 deletions fs/backgroundfetcher/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ func (lr *sequentialLayerResolver) Resolve(ctx context.Context) (bool, error) {
}
if errors.Is(err, sm.ErrExceedMaxSpan) {
commonmetrics.MeasureLatencyInMilliseconds(commonmetrics.BackgroundFetch, lr.layerDigest, lr.base.start)
lr.SpanManager.MarkDownloaded()
return false, nil
}

commonmetrics.IncOperationCount(commonmetrics.BackgroundSpanFetchFailureCount, lr.layerDigest)
lr.SpanManager.MarkDownloaded()
return false, fmt.Errorf("error trying to fetch span with spanId = %d from layerDigest = %s: %w",
lr.nextSpanFetchID, lr.layerDigest.String(), err)
}
16 changes: 13 additions & 3 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/awslabs/soci-snapshotter/fs/layer"
commonmetrics "github.com/awslabs/soci-snapshotter/fs/metrics/common"
layermetrics "github.com/awslabs/soci-snapshotter/fs/metrics/layer"
"github.com/awslabs/soci-snapshotter/fs/reader"
"github.com/awslabs/soci-snapshotter/fs/remote"
"github.com/awslabs/soci-snapshotter/fs/source"
"github.com/awslabs/soci-snapshotter/idtools"
Expand Down Expand Up @@ -229,7 +230,7 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
var bgFetcher *bf.BackgroundFetcher

if !cfg.BackgroundFetchConfig.Disable {
log.G(context.Background()).WithFields(logrus.Fields{
log.G(ctx).WithFields(logrus.Fields{
"fetchPeriod": bgFetchPeriod,
"silencePeriod": bgSilencePeriod,
"maxQueueSize": bgMaxQueueSize,
Expand All @@ -246,10 +247,19 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
}
go bgFetcher.Run(context.Background())
} else {
log.G(context.Background()).Info("background fetch is disabled")
log.G(ctx).Info("background fetch is disabled")
}

var readerVerifier *reader.Verifier
if !cfg.DisableVerification {
log.G(ctx).Info("creating reader verifier")
readerVerifier = reader.NewVerifier(cfg.BackgroundFetchConfig.MaxQueueSize)
go readerVerifier.Run()
} else {
log.G(ctx).Info("reader verification is disabled")
}

r, err := layer.NewResolver(root, cfg, fsOpts.resolveHandlers, metadataStore, store, fsOpts.overlayOpaqueType, bgFetcher)
r, err := layer.NewResolver(root, cfg, fsOpts.resolveHandlers, metadataStore, store, fsOpts.overlayOpaqueType, bgFetcher, readerVerifier)
if err != nil {
return nil, fmt.Errorf("failed to setup resolver: %w", err)
}
Expand Down
10 changes: 9 additions & 1 deletion fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ type Resolver struct {
artifactStore content.Storage
overlayOpaqueType OverlayOpaqueType
bgFetcher *backgroundfetcher.BackgroundFetcher
readerVerifier *reader.Verifier
}

// NewResolver returns a new layer resolver.
func NewResolver(root string, cfg config.FSConfig, resolveHandlers map[string]remote.Handler,
metadataStore metadata.Store, artifactStore content.Storage, overlayOpaqueType OverlayOpaqueType, bgFetcher *backgroundfetcher.BackgroundFetcher) (*Resolver, error) {
metadataStore metadata.Store, artifactStore content.Storage, overlayOpaqueType OverlayOpaqueType, bgFetcher *backgroundfetcher.BackgroundFetcher, readerVerifier *reader.Verifier) (*Resolver, error) {
resolveResultEntry := cfg.ResolveResultEntry
if resolveResultEntry == 0 {
resolveResultEntry = defaultResolveResultEntry
Expand Down Expand Up @@ -175,6 +176,7 @@ func NewResolver(root string, cfg config.FSConfig, resolveHandlers map[string]re
artifactStore: artifactStore,
overlayOpaqueType: overlayOpaqueType,
bgFetcher: bgFetcher,
readerVerifier: readerVerifier,
}, nil
}

Expand Down Expand Up @@ -334,6 +336,12 @@ func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, ref
r.bgFetcher.Add(bgLayerResolver)
}
vr, err := reader.NewReader(meta, desc.Digest, spanManager, disableVerification)
if r.readerVerifier != nil {
err := r.readerVerifier.Add(vr)
if err != nil {
return nil, err
}
}
if err != nil {
return nil, fmt.Errorf("failed to read layer: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions fs/metrics/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (
FuseWhiteoutGetattrFailureCount = "fuse_whiteout_getattr_failure_count"
FuseUnknownFailureCount = "fuse_unknown_operation_failure_count"

FileVerificationFailureCount = "file_verification_failure_count"

// TODO this metric is not available now. This needs to go down to BlobReader where the actuall http call is issued
SynchronousBytesFetched = "synchronous_bytes_fetched"

Expand Down
45 changes: 44 additions & 1 deletion fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"errors"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"time"
Expand All @@ -57,6 +58,8 @@ import (
digest "github.com/opencontainers/go-digest"
)

var errReaderClosed = errors.New("reader is already closed")

type Reader interface {
OpenFile(id uint32) (io.ReaderAt, error)
Metadata() metadata.Reader
Expand All @@ -83,6 +86,7 @@ type reader struct {
lastReadTimeMu sync.Mutex

closed bool
closedC chan struct{}
closedMu sync.Mutex

disableVerification bool
Expand All @@ -92,6 +96,38 @@ func (gr *reader) Metadata() metadata.Reader {
return gr.r
}

func (gr *reader) Verify() error {
if gr.disableVerification {
return nil
}
queue := []uint32{gr.r.RootID()}
for len(queue) > 0 && !gr.closed {
current := queue[0]
queue = queue[1:]
gr.r.ForeachChild(current, func(_ string, id uint32, _ os.FileMode) bool {
queue = append(queue, id)
return true
})
if current == gr.r.RootID() {
// The RootID does not map to a real file in the layer, so we don't verify it.
continue
}
f, err := gr.openFile(current)
if err != nil {
if errors.Is(err, errReaderClosed) {
// we're in cleanup, this is fine
return nil
}
return err
}
err = f.Verify()
if err != nil {
return err
}
}
return nil
}

func (gr *reader) setLastReadTime(lastReadTime time.Time) {
gr.lastReadTimeMu.Lock()
gr.lastReadTime = lastReadTime
Expand All @@ -106,8 +142,12 @@ func (gr *reader) LastOnDemandReadTime() time.Time {
}

func (gr *reader) OpenFile(id uint32) (io.ReaderAt, error) {
return gr.openFile(id)
}

func (gr *reader) openFile(id uint32) (*file, error) {
if gr.isClosed() {
return nil, fmt.Errorf("reader is already closed")
return nil, errReaderClosed
}
var fr metadata.File
fr, err := gr.r.OpenFile(id)
Expand All @@ -128,6 +168,7 @@ func (gr *reader) Close() (retErr error) {
return nil
}
gr.closed = true
close(gr.closedC)
if err := gr.r.Close(); err != nil {
retErr = errors.Join(retErr, err)
}
Expand Down Expand Up @@ -202,6 +243,8 @@ func (sf *file) Verify() (retErr error) {
defer func() {
if retErr == nil {
sf.verified.Store(true)
} else {
commonmetrics.IncOperationCount(commonmetrics.FileVerificationFailureCount, sf.gr.layerSha)
}
}()

Expand Down
88 changes: 88 additions & 0 deletions fs/reader/verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright The Soci Snapshotter Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reader

import (
"errors"
"sync"

"github.com/containerd/log"
)

var ErrNotReader = errors.New("reader is not a *reader.reader")

type Verifier struct {
queue chan *reader

closedMu sync.Mutex
closed bool
closedC chan struct{}
}

func NewVerifier(maxQueueSize int) *Verifier {
return &Verifier{
queue: make(chan *reader, maxQueueSize),
closedMu: sync.Mutex{},
closed: false,
closedC: make(chan struct{}),
}
}

func (v *Verifier) Add(r Reader) error {
switch r := r.(type) {
case *reader:
go func() {
select {
case <-r.spanManager.DownloadedC:
v.queue <- r
case <-v.closedC:
case <-r.closedC:
}
}()
return nil
default:
return ErrNotReader
}
}

func (v *Verifier) Run() {
for {
select {
case r := <-v.queue:
if !r.isClosed() {
l := log.L.WithField("layer", r.layerSha)
err := r.Verify()
if err == nil {
l.Debug("verified reader")
} else {
l.WithError(err).Error("failed to verify reader")
}

}
case <-v.closedC:
}
}
}

func (v *Verifier) Close() {
v.closedMu.Lock()
if !v.closed {
v.closed = true
close(v.closedC)
}
v.closedMu.Unlock()
}
14 changes: 14 additions & 0 deletions fs/span-manager/span_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"runtime"
"sync"

"github.com/awslabs/soci-snapshotter/cache"
"github.com/awslabs/soci-snapshotter/ztoc"
Expand Down Expand Up @@ -72,6 +73,9 @@ type SpanManager struct {
spans []*span
ztoc *ztoc.Ztoc
maxSpanVerificationFailureRetries int
downloaded bool
downloadedMu sync.Mutex
DownloadedC chan struct{}
}

type spanInfo struct {
Expand Down Expand Up @@ -117,6 +121,7 @@ func New(ztoc *ztoc.Ztoc, r *io.SectionReader, cache cache.BlobCache, retries in
spans: spans,
ztoc: ztoc,
maxSpanVerificationFailureRetries: retries,
DownloadedC: make(chan struct{}),
}
if m.maxSpanVerificationFailureRetries < 0 {
m.maxSpanVerificationFailureRetries = defaultSpanVerificationFailureRetries
Expand Down Expand Up @@ -314,6 +319,15 @@ func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, off
return io.NopCloser(buf), nil
}

func (m *SpanManager) MarkDownloaded() {
m.downloadedMu.Lock()
if !m.downloaded {
m.downloaded = true
close(m.DownloadedC)
}
m.downloadedMu.Unlock()
}

// fetchAndCacheSpan fetches a span, uncompresses the span if `uncompress == true`,
// caches and returns the span content. The span state is set to `fetched/uncompressed`,
// depending on if `uncompress` is enabled.
Expand Down
4 changes: 0 additions & 4 deletions metadata/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,6 @@ func (r *reader) OpenFile(id uint32) (File, error) {
return fmt.Errorf("failed to get file bucket %d: %w", id, err)
}
size, _ = binary.Varint(b.Get(bucketKeySize))
m, _ := binary.Uvarint(b.Get(bucketKeyMode))
if !os.FileMode(uint32(m)).IsRegular() {
return fmt.Errorf("%q is not a regular file", id)
}
metadataEntries, err := getMetadataBucket(tx, r.fsID)
if err != nil {
return fmt.Errorf("metadata bucket of %q not found for opening %d: %w", r.fsID, id, err)
Expand Down

0 comments on commit d0b0df2

Please sign in to comment.