Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS replica client #596

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
/dist
.vscode
/dist
58 changes: 53 additions & 5 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import (
"strings"
"time"

natsClient "github.com/nats-io/nats.go"

"filippo.io/age"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
_ "github.com/mattn/go-sqlite3"
Expand Down Expand Up @@ -363,6 +366,12 @@ type ReplicaConfig struct {
Password string `yaml:"password"`
KeyPath string `yaml:"key-path"`

// NATS settings
JWT string `yaml:"jwt"`
Seed string `yaml:"seed"`
Comment on lines +370 to +371
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All auth methods should be supported. If an additional dependency is welcome, this package handles all the options as well as looking up the nats context profile. https://pkg.go.dev/github.com/nats-io/jsm.go/natscontext

ReplicaCount int `yaml:"replica"`
RetentionBytes int64 `yaml:"retention-bytes"`

// Encryption identities and recipients
Age struct {
Identities []string `yaml:"identities"`
Expand Down Expand Up @@ -413,26 +422,30 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re

// Build and set client on replica.
switch c.ReplicaType() {
case "file":
case file.ReplicaClientType:
if r.Client, err = newFileReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "s3":
case s3.ReplicaClientType:
if r.Client, err = newS3ReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "gcs":
case gcs.ReplicaClientType:
if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "abs":
case abs.ReplicaClientType:
if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "sftp":
case sftp.ReplicaClientType:
if r.Client, err = newSFTPReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case nats.ReplicaClientType:
if r.Client, err = newNATSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}
Expand Down Expand Up @@ -666,6 +679,41 @@ func newSFTPReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_
return client, nil
}

// newNATSReplicaClientFromConfig returns a new instance of nats.ReplicaClient built from config.
func newNATSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *nats.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for nats replica")
}

// Ensure required settings are set.
if c.Bucket == "" {
return nil, fmt.Errorf("bucket required for nats replica")
}

nc, err := natsClient.Connect(
c.URL,
natsClient.UserJWTAndSeed(c.JWT, c.Seed),
)
if err != nil {
return nil, fmt.Errorf("cannot connect to nats server: %w", err)
}

// Build replica.
client := nats.NewReplicaClient(nc)
client.BucketName = c.Bucket
client.BucketMaxBytes = c.RetentionBytes
if c.ReplicaCount > 0 {
client.BucketReplicas = c.ReplicaCount
}

if c.Retention != nil {
client.BucketTTL = *c.Retention
}

return client, nil
}

// applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to
// their AWS counterparts as the "AWS" prefix can be confusing when using a
// non-AWS S3-compatible service.
Expand Down
3 changes: 3 additions & 0 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
"github.com/mattn/go-shellwords"
Expand Down Expand Up @@ -121,6 +122,8 @@ func (c *ReplicateCommand) Run() (err error) {
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
case *sftp.ReplicaClient:
slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
case *nats.ReplicaClient:
slog.Info("replicating to", "bucket", client.BucketName)
default:
slog.Info("replicating to")
}
Expand Down
37 changes: 27 additions & 10 deletions file/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
filename, err := c.SnapshotPath(generation, index)
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
return info, litestream.ErrSnapshotPathNoGeneration
}

var fileInfo, dirInfo os.FileInfo
Expand Down Expand Up @@ -239,16 +239,23 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
filename, err := c.SnapshotPath(generation, index)
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
return nil, litestream.ErrSnapshotDoesNotExist
}
return os.Open(filename)
f, err := os.Open(filename)
if err != nil {
if os.IsNotExist(err) {
return nil, litestream.ErrSnapshotDoesNotExist
}
return nil, err
}
return f, nil
}

// DeleteSnapshot deletes a snapshot with the given generation & index.
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
filename, err := c.SnapshotPath(generation, index)
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
return litestream.ErrSnapshotDoesNotExist
}
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
Expand All @@ -260,19 +267,22 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
dir, err := c.WALDir(generation)
if err != nil {
return nil, fmt.Errorf("cannot determine wal path: %w", err)
return nil, litestream.ErrWALPathNoGeneration
}

f, err := os.Open(dir)
if os.IsNotExist(err) {
return litestream.NewWALSegmentInfoSliceIterator(nil), nil
} else if err != nil {
return nil, err
return nil, litestream.ErrWALPathNoGeneration
}
defer f.Close()

fis, err := f.Readdir(-1)
if err != nil {
if err == os.ErrNotExist {
return nil, litestream.ErrWALPathNoGeneration
}
return nil, err
}

Expand Down Expand Up @@ -301,7 +311,7 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset)
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
return info, litestream.ErrWALPathNoGeneration
}

var fileInfo, dirInfo os.FileInfo
Expand Down Expand Up @@ -355,17 +365,24 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset)
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
return nil, litestream.ErrWALSegmentPathNoGeneration
}
f, err := os.Open(filename)
if err != nil {
if os.IsNotExist(err) {
return nil, litestream.ErrSnapshotDoesNotExist
}
return nil, err
}
return os.Open(filename)
return f, nil
}

// DeleteWALSegments deletes WAL segments at the given positions.
func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error {
for _, pos := range a {
filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset)
if err != nil {
return fmt.Errorf("cannot determine wal segment path: %w", err)
return litestream.ErrWALSegmentPathNoGeneration
}
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
Expand Down
17 changes: 10 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ require (
github.com/aws/aws-sdk-go v1.49.5
github.com/mattn/go-shellwords v1.0.12
github.com/mattn/go-sqlite3 v1.14.19
github.com/nats-io/nats.go v1.36.0
github.com/pierrec/lz4/v4 v4.1.19
github.com/pkg/sftp v1.13.6
github.com/prometheus/client_golang v1.17.0
golang.org/x/crypto v0.17.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
github.com/valyala/bytebufferpool v1.0.0
golang.org/x/crypto v0.25.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
google.golang.org/api v0.154.0
gopkg.in/yaml.v2 v2.4.0
)
Expand All @@ -32,16 +34,17 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.11 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand All @@ -51,9 +54,9 @@ require (
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
Expand Down
Loading