Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services: add default values for NeoFSBlockFetcher configuration #3742

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cli/util/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/cli/txctx"
vmcli "github.com/nspcc-dev/neo-go/cli/vm"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -74,7 +75,7 @@ func NewCommands() []*cli.Command {
&cli.UintFlag{
Name: "index-file-size",
Usage: "Size of index file",
Value: 128000,
Value: neofs.DefaultIndexFileSize,
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
},
&cli.UintFlag{
Name: "workers",
Expand All @@ -89,7 +90,7 @@ func NewCommands() []*cli.Command {
&cli.UintFlag{
Name: "retries",
Usage: "Maximum number of Neo/NeoFS node request retries",
Value: 5,
Value: neofs.MaxRetries,
Action: func(context *cli.Context, u uint) error {
if u < 1 {
return cli.Exit("retries should be greater than 0", 1)
Expand Down
70 changes: 20 additions & 50 deletions cli/util/upload_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import (
"context"
"crypto/sha256"
"fmt"
"slices"
"strconv"
Expand All @@ -14,7 +13,7 @@
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
Expand All @@ -30,35 +29,6 @@
"github.com/urfave/cli/v2"
)

const (
// Number of objects to search in a batch. We need to search with EQ filter to
// avoid partially-completed SEARCH responses. If EQ search haven't found object
// the object will be uploaded one more time which may lead to duplicating objects.
// We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees
// search results).
searchBatchSize = 1
// Size of object ID.
oidSize = sha256.Size
)

// Constants related to retry mechanism.
const (
// Initial backoff duration.
initialBackoff = 500 * time.Millisecond
// Backoff multiplier.
backoffFactor = 2
// Maximum backoff duration.
maxBackoff = 20 * time.Second
)

// Constants related to NeoFS pool request timeouts.
// Such big values are used to avoid NeoFS pool timeouts during block search and upload.
const (
defaultDialTimeout = 10 * time.Minute
defaultStreamTimeout = 10 * time.Minute
defaultHealthcheckTimeout = 10 * time.Second
)

// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type poolWrapper struct {
*pool.Pool
Expand Down Expand Up @@ -103,9 +73,9 @@
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)

params := pool.DefaultOptions()
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
params.SetNodeDialTimeout(defaultDialTimeout)
params.SetNodeStreamTimeout(defaultStreamTimeout)
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
Expand Down Expand Up @@ -166,15 +136,15 @@
// retry function with exponential backoff.
func retry(action func() error, maxRetries uint) error {
var err error
backoff := initialBackoff
backoff := neofs.InitialBackoff

Check warning on line 139 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L139

Added line #L139 was not covered by tests
for range maxRetries {
if err = action(); err == nil {
return nil // Success, no retry needed.
}
time.Sleep(backoff) // Backoff before retrying.
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
backoff *= time.Duration(neofs.BackoffFactor)
if backoff > neofs.MaxBackoff {
backoff = neofs.MaxBackoff

Check warning on line 147 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L145-L147

Added lines #L145 - L147 were not covered by tests
}
}
return err // Return the last error after exhausting retries.
Expand All @@ -193,15 +163,15 @@
errCh = make(chan error)
doneCh = make(chan struct{})
wg sync.WaitGroup
emptyOID = make([]byte, oidSize)
emptyOID = make([]byte, neofs.OIDSize)

Check warning on line 166 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L166

Added line #L166 was not covered by tests
)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1)
wg.Add(int(numWorkers))
for i := range numWorkers {
go func(i uint) {
defer wg.Done()
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers {
if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 {
if slices.Compare(buf[blockIndex%indexFileSize*neofs.OIDSize:blockIndex%indexFileSize*neofs.OIDSize+neofs.OIDSize], emptyOID) != 0 {

Check warning on line 174 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L174

Added line #L174 was not covered by tests
if debug {
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex)
}
Expand Down Expand Up @@ -263,7 +233,7 @@
}
return
}
resOid.Encode(buf[blockIndex%indexFileSize*oidSize:])
resOid.Encode(buf[blockIndex%indexFileSize*neofs.OIDSize:])

Check warning on line 236 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L236

Added line #L236 was not covered by tests
}
}(i)
}
Expand All @@ -281,9 +251,9 @@
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1)

// Additional check for empty OIDs in the buffer.
for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize {
if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize)
for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OIDSize; k += neofs.OIDSize {
if slices.Compare(buf[k:k+neofs.OIDSize], emptyOID) == 0 {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OIDSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OIDSize)

Check warning on line 256 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L254-L256

Added lines #L254 - L256 were not covered by tests
}
}
if indexFileEnd-indexFileStart == indexFileSize {
Expand All @@ -310,7 +280,7 @@
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
var (
// buf is used to store OIDs of the uploaded blocks.
buf = make([]byte, indexFileSize*oidSize)
buf = make([]byte, indexFileSize*neofs.OIDSize)

Check warning on line 283 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L283

Added line #L283 was not covered by tests
doneCh = make(chan struct{})
errCh = make(chan error)

Expand Down Expand Up @@ -377,7 +347,7 @@
}
pos := uint(blockIndex) % indexFileSize
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
id.Encode(buf[pos*oidSize:])
id.Encode(buf[pos*neofs.OIDSize:])

Check warning on line 350 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L350

Added line #L350 was not covered by tests
}
}
}()
Expand All @@ -404,15 +374,15 @@
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize)
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)

Check warning on line 377 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L377

Added line #L377 was not covered by tests
go func() {
var wg sync.WaitGroup
defer close(res)

for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches {
for i := startIndex; i < endIndex; i += neofs.DefaultSearchBatchSize * maxParallelSearches {

Check warning on line 382 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L382

Added line #L382 was not covered by tests
for j := range maxParallelSearches {
start := i + j*searchBatchSize
end := start + searchBatchSize
start := i + j*neofs.DefaultSearchBatchSize
end := start + neofs.DefaultSearchBatchSize

Check warning on line 385 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L384-L385

Added lines #L384 - L385 were not covered by tests

if start >= endIndex {
break
Expand Down
14 changes: 0 additions & 14 deletions pkg/config/application_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,6 @@ func TestNeoFSBlockFetcherValidation(t *testing.T) {
shouldFail: true,
errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)",
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
SkipIndexFilesSearch: false,
IndexFileSize: 0,
},
shouldFail: true,
errMsg: "IndexFileSize is not set",
},
}

for _, c := range cases {
Expand Down
5 changes: 1 addition & 4 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ func (cfg *NeoFSBlockFetcher) Validate() error {
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)
}
if cfg.BQueueSize < cfg.OIDBatchSize {
if cfg.BQueueSize > 0 && cfg.BQueueSize < cfg.OIDBatchSize {
return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize)
}
if len(cfg.Addresses) == 0 {
return errors.New("addresses are not set")
}
if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 {
return errors.New("IndexFileSize is not set")
}
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
3 changes: 3 additions & 0 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
Expand Down
79 changes: 26 additions & 53 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
Expand All @@ -16,7 +15,7 @@
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
Expand All @@ -29,40 +28,8 @@
)

const (
// oidSize is the size of the object ID in NeoFS.
oidSize = sha256.Size
// defaultTimeout is the default timeout for NeoFS requests.
defaultTimeout = 5 * time.Minute
// defaultOIDBatchSize is the default number of OIDs to search and fetch at once.
defaultOIDBatchSize = 8000
// defaultDownloaderWorkersCount is the default number of workers downloading blocks.
defaultDownloaderWorkersCount = 100
)

// Constants related to NeoFS pool request timeouts.
const (
// defaultDialTimeout is a default timeout used to establish connection with
// NeoFS storage nodes.
defaultDialTimeout = 30 * time.Second
// defaultStreamTimeout is a default timeout used for NeoFS streams processing.
// It has significantly large value to reliably avoid timeout problems with heavy
// SEARCH requests.
defaultStreamTimeout = 10 * time.Minute
// defaultHealthcheckTimeout is a timeout for request to NeoFS storage node to
// decide if it is alive.
defaultHealthcheckTimeout = 10 * time.Second
)

// Constants related to retry mechanism.
const (
// maxRetries is the maximum number of retries for a single operation.
maxRetries = 5
// initialBackoff is the initial backoff duration.
initialBackoff = 500 * time.Millisecond
// backoffFactor is the factor by which the backoff duration is multiplied.
backoffFactor = 2
// maxBackoff is the maximum backoff duration.
maxBackoff = 20 * time.Second
// DefaultQueueCacheSize is the default size of the queue cache.
DefaultQueueCacheSize = 16000
)

// Ledger is an interface to Blockchain sufficient for Service.
Expand Down Expand Up @@ -143,22 +110,28 @@
}
}
if cfg.Timeout <= 0 {
cfg.Timeout = defaultTimeout
cfg.Timeout = neofs.DefaultTimeout
}
if cfg.OIDBatchSize <= 0 {
cfg.OIDBatchSize = defaultOIDBatchSize
cfg.OIDBatchSize = cfg.BQueueSize / 2
}
if cfg.DownloaderWorkersCount <= 0 {
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
cfg.DownloaderWorkersCount = neofs.DefaultDownloaderWorkersCount
}
if cfg.IndexFileSize <= 0 {
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
cfg.IndexFileSize = neofs.DefaultIndexFileSize
}
if cfg.BlockAttribute == "" {
cfg.BlockAttribute = neofs.DefaultBlockAttribute
}
if len(cfg.Addresses) == 0 {
return nil, errors.New("no addresses provided")
if cfg.IndexFileAttribute == "" {
cfg.IndexFileAttribute = neofs.DefaultIndexFileAttribute
}

params := pool.DefaultOptions()
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
params.SetNodeDialTimeout(defaultDialTimeout)
params.SetNodeStreamTimeout(defaultStreamTimeout)
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params)
if err != nil {
return nil, err
Expand Down Expand Up @@ -357,7 +330,7 @@
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
defer rc.Close()
oidBytes := make([]byte, oidSize)
oidBytes := make([]byte, neofs.OIDSize)

Check warning on line 333 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L333

Added line #L333 was not covered by tests
oidsProcessed := 0

for {
Expand Down Expand Up @@ -397,7 +370,7 @@
func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight()
//We need to search with EQ filter to avoid partially-completed SEARCH responses.
batchSize := uint32(1)
batchSize := uint32(neofs.DefaultSearchBatchSize)

Check warning on line 373 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L373

Added line #L373 was not covered by tests

for {
select {
Expand Down Expand Up @@ -513,7 +486,7 @@
func (bfs *Service) retry(action func() error) error {
var (
err error
backoff = initialBackoff
backoff = neofs.InitialBackoff

Check warning on line 489 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L489

Added line #L489 was not covered by tests
timer = time.NewTimer(0)
)
defer func() {
Expand All @@ -525,11 +498,11 @@
}
}()

for i := range maxRetries {
for i := range neofs.MaxRetries {

Check warning on line 501 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L501

Added line #L501 was not covered by tests
if err = action(); err == nil {
return nil
}
if i == maxRetries-1 {
if i == neofs.MaxRetries-1 {

Check warning on line 505 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L505

Added line #L505 was not covered by tests
break
}
timer.Reset(backoff)
Expand All @@ -539,16 +512,16 @@
case <-bfs.ctx.Done():
return bfs.ctx.Err()
}
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
backoff *= time.Duration(neofs.BackoffFactor)
if backoff > neofs.MaxBackoff {
backoff = neofs.MaxBackoff

Check warning on line 517 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L515-L517

Added lines #L515 - L517 were not covered by tests
}
}
return err
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
}

func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid))
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))

Check warning on line 524 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L524

Added line #L524 was not covered by tests
if err != nil {
return nil, err
}
Expand Down
Loading
Loading