Skip to content

Commit

Permalink
Merge pull request #90 from nyaruka/big_file_fix
Browse files Browse the repository at this point in the history
🦬 Big file fixes
  • Loading branch information
rowanseymour authored Sep 26, 2023
2 parents 13c73b1 + 7bbf668 commit 410c77e
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 90 deletions.
64 changes: 7 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,78 +26,28 @@ We recommend running it with no changes to the configuration and no parameters,
environment variables to configure it. You can use `% rp-archiver --help` to see a list of the
environment variables and parameters and for more details on each option.

### RapidPro

For use with RapidPro, you will want to configure these settings:
For use with RapidPro/TextIt, you will need to configure these settings:

* `ARCHIVER_DB`: URL describing how to connect to the database (default "postgres://temba:temba@localhost/temba?sslmode=disable")
* `ARCHIVER_TEMP_DIR`: The directory that temporary archives will be written before upload (default "/tmp")
* `ARCHIVER_DELETE`: Whether to delete messages and runs after they are archived, we recommend setting this to true for large installations (default false)

For writing of archives, Archiver needs access to an S3 bucket, you can configure access to your bucket via:
For writing of archives, Archiver needs access to a storage bucket on an S3 compatible service. For AWS we recommend that
you choose SSE-S3 encryption as this is the only type that supports validation of upload ETags.

* `ARCHIVER_S3_REGION`: The region for your S3 bucket (ex: `ew-west-1`)
* `ARCHIVER_S3_BUCKET`: The name of your S3 bucket (ex: `dl-archiver-test"`)
* `ARCHIVER_S3_ENDPOINT`: The S3 endpoint we will write archives to (default "https://s3.amazonaws.com")
* `ARCHIVER_AWS_ACCESS_KEY_ID`: The AWS access key id used to authenticate to AWS
* `ARCHIVER_AWS_SECRET_ACCESS_KEY` The AWS secret access key used to authenticate to AWS

Recommended settings for error reporting:

* `ARCHIVER_SENTRY_DSN`: The DSN to use when logging errors to Sentry
If using a different encryption type or service that produces non-MD5 ETags:

### Reference
* `CHECK_S3_HASHES`: can be set to `FALSE` to disable checking of upload hashes.

These are the configuration options that can be provided as parameters or environment variables. If using environment
varibles, convert to uppercase, replace dashes with underscores and prefix the name with `ARCHIVER_`, e.g. `-log-level`
becomes `ARCHIVER_LOG_LEVEL`.
Recommended settings for error reporting:

```
-archive-messages
whether we should archive messages (default true)
-archive-runs
whether we should archive runs (default true)
-aws-access-key-id string
the access key id to use when authenticating S3 (default none)
-aws-secret-access-key string
the secret access key id to use when authenticating S3 (default none)
-db string
the connection string for our database (default "postgres://localhost/archiver_test?sslmode=disable")
-debug-conf
print where config values are coming from
-delete
whether to delete messages and runs from the db after archival (default false)
-help
print usage information
-keep-files
whether we should keep local archive files after upload (default false)
-librato-username
the Librato username for metrics reporting
-librato-token
the Librato token for metrics reporting
-log-level string
the log level, one of error, warn, info, debug (default "info")
-once
run archving immediately and then exit
-retention-period int
the number of days to keep before archiving (default 90)
-s3-bucket string
the S3 bucket we will write archives to (default "dl-archiver-test")
-s3-disable-ssl
whether we disable SSL when accessing S3. Should always be set to False unless you're hosting an S3 compatible service within a secure internal network
-s3-endpoint string
the S3 endpoint we will write archives to (default "https://s3.amazonaws.com")
-s3-force-path-style
whether we force S3 path style. Should generally need to default to False unless you're hosting an S3 compatible service
-s3-region string
the S3 region we will write archives to (default "us-east-1")
-sentry-dsn string
the sentry configuration to log errors to, if any
-temp-dir string
directory where temporary archive files are written (default "/tmp")
-upload-to-s3
whether we should upload archive to S3 (default true)
```
* `ARCHIVER_SENTRY_DSN`: The DSN to use when logging errors to Sentry

## Development

Expand Down
4 changes: 0 additions & 4 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,6 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
return errors.Wrapf(err, "error calculating archive hash")
}

if stat.Size() > 5e9 {
return fmt.Errorf("archive too large, must be smaller than 5 gigs, build dailies if possible")
}

archive.ArchiveFile = file.Name()
archive.Size = stat.Size()
archive.RecordCount = recordCount
Expand Down
14 changes: 8 additions & 6 deletions archives/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ type Config struct {
AWSAccessKeyID string `help:"the access key id to use when authenticating S3"`
AWSSecretAccessKey string `help:"the secret access key id to use when authenticating S3"`

TempDir string `help:"directory where temporary archive files are written"`
KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"`
UploadToS3 bool `help:"whether we should upload archive to S3"`
TempDir string `help:"directory where temporary archive files are written"`
KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"`
UploadToS3 bool `help:"whether we should upload archive to S3"`
CheckS3Hashes bool `help:"whether to check S3 hashes of uploaded archives before deleting records"`

ArchiveMessages bool `help:"whether we should archive messages"`
ArchiveRuns bool `help:"whether we should archive runs"`
Expand Down Expand Up @@ -50,9 +51,10 @@ func NewDefaultConfig() *Config {
AWSAccessKeyID: "",
AWSSecretAccessKey: "",

TempDir: "/tmp",
KeepFiles: false,
UploadToS3: true,
TempDir: "/tmp",
KeepFiles: false,
UploadToS3: true,
CheckS3Hashes: true,

ArchiveMessages: true,
ArchiveRuns: true,
Expand Down
14 changes: 9 additions & 5 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,19 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
})
log.Info("deleting messages")

// first things first, make sure our file is present on S3
md5, err := GetS3FileETAG(outer, s3Client, archive.URL)
// first things first, make sure our file is correct on S3
s3Size, s3Hash, err := GetS3FileInfo(outer, s3Client, archive.URL)
if err != nil {
return err
}

// if our etag and archive md5 don't match, that's an error, return
if md5 != archive.Hash {
return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5)
if s3Size != archive.Size {
return fmt.Errorf("archive size: %d and s3 size: %d do not match", archive.Size, s3Size)
}

// if S3 hash is MD5 then check against archive hash
if config.CheckS3Hashes && archive.Size <= maxSingleUploadBytes && s3Hash != archive.Hash {
return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, s3Hash)
}

// ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big
Expand Down
14 changes: 9 additions & 5 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,19 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
})
log.Info("deleting runs")

// first things first, make sure our file is present on S3
md5, err := GetS3FileETAG(outer, s3Client, archive.URL)
// first things first, make sure our file is correct on S3
s3Size, s3Hash, err := GetS3FileInfo(outer, s3Client, archive.URL)
if err != nil {
return err
}

// if our etag and archive md5 don't match, that's an error, return
if md5 != archive.Hash {
return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5)
if s3Size != archive.Size {
return fmt.Errorf("archive size: %d and s3 size: %d do not match", archive.Size, s3Size)
}

// if S3 hash is MD5 then check against archive hash
if config.CheckS3Hashes && archive.Size <= maxSingleUploadBytes && s3Hash != archive.Hash {
return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, s3Hash)
}

// ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big
Expand Down
33 changes: 20 additions & 13 deletions archives/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ import (
"github.com/sirupsen/logrus"
)

var s3BucketURL = "https://%s.s3.amazonaws.com%s"
const s3BucketURL = "https://%s.s3.amazonaws.com%s"

// any file over this needs to be uploaded in chunks
const maxSingleUploadBytes = 5e9 // 5GB

// size of chunk to use when doing multi-part uploads
const chunkSizeBytes = 1e9 // 1GB

// NewS3Client creates a new s3 client from the passed in config, testing it as necessary
func NewS3Client(config *Config) (s3iface.S3API, error) {
Expand Down Expand Up @@ -81,7 +87,7 @@ func UploadToS3(ctx context.Context, s3Client s3iface.S3API, bucket string, path
md5 := base64.StdEncoding.EncodeToString(hashBytes)

// if this fits into a single part, upload that way
if archive.Size <= 5e9 {
if archive.Size <= maxSingleUploadBytes {
params := &s3.PutObjectInput{
Bucket: aws.String(bucket),
Body: f,
Expand All @@ -97,11 +103,11 @@ func UploadToS3(ctx context.Context, s3Client s3iface.S3API, bucket string, path
return err
}
} else {
// this file is bigger than 5 gigs, use an upload manager instead, it will take care of uploading in parts
// this file is bigger than limit, use an upload manager instead, it will take care of uploading in parts
uploader := s3manager.NewUploaderWithClient(
s3Client,
func(u *s3manager.Uploader) {
u.PartSize = 1e9 // 1 gig per part
u.PartSize = chunkSizeBytes
},
)
params := &s3manager.UploadInput{
Expand Down Expand Up @@ -129,17 +135,17 @@ func withAcceptEncoding(e string) request.Option {
}
}

// GetS3FileETAG returns the ETAG hash for the passed in file
func GetS3FileETAG(ctx context.Context, s3Client s3iface.S3API, fileURL string) (string, error) {
// GetS3FileInfo returns the ETAG hash for the passed in file
func GetS3FileInfo(ctx context.Context, s3Client s3iface.S3API, fileURL string) (int64, string, error) {
u, err := url.Parse(fileURL)
if err != nil {
return "", err
return 0, "", err
}

bucket := strings.Split(u.Host, ".")[0]
path := u.Path

output, err := s3Client.HeadObjectWithContext(
head, err := s3Client.HeadObjectWithContext(
ctx,
&s3.HeadObjectInput{
Bucket: aws.String(bucket),
Expand All @@ -148,16 +154,17 @@ func GetS3FileETAG(ctx context.Context, s3Client s3iface.S3API, fileURL string)
)

if err != nil {
return "", err
return 0, "", err
}

if output.ETag == nil {
return "", fmt.Errorf("no ETAG for object")
if head.ContentLength == nil || head.ETag == nil {
return 0, "", fmt.Errorf("no size or ETag returned for S3 object")
}

// etag is quoted, remove them
etag := strings.Trim(*output.ETag, `"`)
return etag, nil
etag := strings.Trim(*head.ETag, `"`)

return *head.ContentLength, etag, nil
}

// GetS3File return an io.ReadCloser for the passed in bucket and path
Expand Down

0 comments on commit 410c77e

Please sign in to comment.