diff --git a/.gitignore b/.gitignore index 41b70c9..de58d41 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -/basin +/vaults /cover.out .vscode/* diff --git a/Makefile b/Makefile index 6e96c4d..f3c09d6 100644 --- a/Makefile +++ b/Makefile @@ -5,9 +5,14 @@ lint: # Build build: - go build -o basin cmd/basin/* + go build -o vaults cmd/vaults/* .PHONY: build +# Install +install: + go install ./cmd/vaults +.PHONY: install + # Test test: go test ./... -short -race -timeout 1m diff --git a/README.md b/README.md index 0ade378..6ad5fd9 100644 --- a/README.md +++ b/README.md @@ -1,47 +1,58 @@ -# basin-cli +# vaults-cli [![License](https://img.shields.io/github/license/tablelandnetwork/basin-cli.svg)](./LICENSE) [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg)](https://github.com/RichardLitt/standard-readme) -> Continuously publish data from your database to the Tableland network. +> Continuously publish data from your database or file uploads to the Tableland Vaults network. -# Table of Contents +## Table of Contents -- [Install](#install) -- [Postgres Setup](#postgres-setup) - - [Self-hosted](#self-hosted) - - [Amazon RDS](#amazon-rds) +- [Background](#background) +- [Usage](#usage) + - [Install](#install) + - [Postgres Setup](#postgres-setup) - [Supabase](#supabase) -- [Create a publication](#create-a-publication) -- [Start replicating a publication](#start-replicating-a-publication) -- [Upload a Parquet file](#upload-a-parquet-file) -- [Listing Publications](#listing-publications) -- [Listing Deals](#listing-deals) -- [Running](#running) -- [Run tests](#run-tests) -- [Retrieving](#retrieving) + - [Create a vault](#create-a-vault) + - [Start replicating a database](#start-replicating-a-database) + - [Write a Parquet file](#write-a-parquet-file) + - [Listing Vaults](#listing-vaults) + - [Listing Events](#listing-events) + - [Retrieving](#retrieving) +- [Development](#development) + - [Running](#running) + - [Run tests](#run-tests) +- [Contributing](#contributing) +- [License](#license) -# Background +## Background -Tableland Basin is a secure and verifiable open data platform. The Basin CLI is a tool that allows you to continuously replicate a table or view from your database to the network. Currently, only PostgreSQL is supported. +Textile Vaults is a secure and verifiable open data platform. The Vaults CLI is a tool that allows you to continuously replicate a table or view from your database to the network (currently, only PostgreSQL is supported). Or, you can directly upload files to the vault (currently, parquet is only supported) -> 🚧 Basin is currently not in a production-ready state. Any data that is pushed to the network may be subject to deletion. 🚧 +> 🚧 Vaults is currently not in a production-ready state. Any data that is pushed to the network may be subject to deletion. 🚧 -# Usage +## Usage -## Install +### Install + +You can either install the CLI from the remote source: + +```bash +go install github.com/tablelandnetwork/basin-cli/cmd/vaults@latest +``` + +Or clone from source and run the Makefile `install` command: ```bash git clone https://github.com/tablelandnetwork/basin-cli.git cd basin-cli -go install ./cmd/basin +make install ``` -## Postgres Setup +### Postgres Setup -### Self-hosted +#### Self-hosted -- Make sure you have access to a superuser role. For example, you can create a new role such as `CREATE ROLE basin WITH PASSWORD NULL LOGIN SUPERUSER;`. +- Make sure you have access to a superuser role. For example, you can create a new role such as `CREATE ROLE vaults WITH PASSWORD NULL LOGIN SUPERUSER;`. - Check that your Postgres installation has the [wal2json](https://github.com/eulerto/wal2json) plugin installed. - Check if logical replication is enabled: @@ -53,7 +64,7 @@ go install ./cmd/basin - Restart the database in order for the new `wal_level` to take effect (be careful!). -### Amazon RDS +#### Amazon RDS - Make sure you have a user with the `rds_superuser` role, and use `psql` to connect to your database. @@ -69,7 +80,7 @@ go install ./cmd/basin WHERE name = 'rds.logical_replication'; ``` -- If it's on, go to [Create a publication](#create-a-publication) +- If it's on, go to [Create a vault](#create-a-vault) - If it's off, follow the next steps: - [Create a custom RDS parameter group](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithDBInstanceParamGroups.html#USER_WorkingWithParamGroups.Creating) - After creation, edit it and set the `rds.logical_replication` parameter to `1` @@ -83,97 +94,111 @@ go install ./cmd/basin - Log into the [Supabase](https://supabase.io/) dashboard and go to your project, or create a new one. - Check if logical replication is enabled. This should be the default setting, so you shouldn't have to change anything. You can do this in the `SQL Editor` section on the left hand side of the Supabase dashboard by running `SHOW wal_level;` query, which should log `logical`. - You can find the database connection information on the left hand side under `Project Settings` > `Database`. You will need the `Host`, `Port`, `Database`, `Username`, and `Password` to connect to your database. - - When you create a publication, the `--dburi` should follow this format: + - When you create a vault, the `--dburi` should follow this format: ```sh postgresql://postgres:[PASSWORD]@db.[PROJECT_ID].supabase.co:5432/postgres ``` -## Create a publication +### Create a vault -_Publications_ define the data you are pushing to Basin. +_Vaults_ define the place you push data into. -Basin uses public key authentication, so you will need an Ethereum style (ECDSA, secp256k1) wallet to create a new publication. You can use an existing wallet or set up a new one with `basin wallet create`. Your private key is only used locally for signing. +Vaults uses public key authentication, so you will need an Ethereum style (ECDSA, secp256k1) wallet to create a new vault. You can use an existing wallet or set up a new one with `vaults wallet create`. Your private key is only used locally for signing. ```bash -basin wallet create [FILENAME] +vaults account create [FILENAME] ``` A new private key will be written to `FILENAME`. -The name of a publication contains a `namespace` (e.g. `my_company`) and the name of an existing database relation (e.g. `my_table`), separated by a period (`.`). Use `basin publication create` to create a new publication. See `basin publication create --help` for more info. +The name of a vault contains a `namespace` (e.g. `my_company`) and the name of an existing database relation (e.g. `my_table`), separated by a period (`.`). Use `vaults create` to create a new vault. See `vaults create --help` for more info. ```bash -basin publication create --dburi [DBURI] --address [WALLET_ADDRESS] namespace.relation_name +vaults create --dburi [DBURI] --account [WALLET_ADDRESS] namespace.relation_name ``` -🚧 Basin currently only replicates `INSERT` statements, which means that it only replicates append-only data (e.g., log-style data). Row updates and deletes will be ignored. 🚧 +🚧 Vaults currently only replicates `INSERT` statements, which means that it only replicates append-only data (e.g., log-style data). Row updates and deletes will be ignored. 🚧 -## Start replicating a publication +### Start replicating a database -Use `basin publication start` to start a daemon that will continuously push changes to the underlying table/view to the network. See `basin publication start --help` for more info. +Use `vaults stream` to start a daemon that will continuously push changes to the underlying table/view to the network. See `vaults stream --help` for more info. ```bash -basin publication start --private-key [PRIVATE_KEY] namespace.relation_name +vaults stream --private-key [PRIVATE_KEY] namespace.relation_name ``` -## Upload a Parquet file +### Write a Parquet file -Before uploading a Parquet file, you need to [Create a publication](#create-a-publication), if not already created. You can omit the `--dburi` flag, in this case. +Before writing a Parquet file, you need to [Create a vault](#create-a-vault), if not already created. You can omit the `--dburi` flag, in this case. -Then, use `basin publication upload` to upload a Parquet file. +Then, use `vaults write` to write a Parquet file. ```bash -basin publication upload --name [namespace.relation_name] --private-key [PRIVATE_KEY] filepath +vaults write --vault [namespace.relation_name] --private-key [PRIVATE_KEY] filepath ``` -You can attach a timestamp to that file upload, e.g. +You can attach a timestamp to that file write, e.g. ```bash -basin publication upload --name [namespace.relation_name] --private-key [PRIVATE_KEY] --timestamp 1699984703 filepath +vaults write --vault [namespace.relation_name] --private-key [PRIVATE_KEY] --timestamp 1699984703 filepath # or use data format -basin publication upload --name [namespace.relation_name] --private-key [PRIVATE_KEY] --timestamp 2006-01-02 filepath +vaults write --vault [namespace.relation_name] --private-key [PRIVATE_KEY] --timestamp 2006-01-02 filepath # or use RFC3339 format -basin publication upload --name [namespace.relation_name] --private-key [PRIVATE_KEY] --timestamp 2006-01-02T15:04:05Z07:00 filepath +vaults write --vault [namespace.relation_name] --private-key [PRIVATE_KEY] --timestamp 2006-01-02T15:04:05Z07:00 filepath ``` If a timestamp is not provided, the CLI will assume the timestamp is the current client epoch in UTC. -## Listing Publications +### Listing Vaults -You can list the publications from an address by running: +You can list the vaults from an account by running: ```bash -basin publication list --address [ETH_ADDRESS] +vaults list --account [ETH_ADDRESS] ``` -## Listing Deals +### Listing Events -You can list deals of a given publication by running: +You can list events of a given vault by running: ```bash -basin publication deals --publication [PUBLICATION] --latest 5 +vaults events --vault [VAULT_NAME] --latest 5 ``` -Deals command accept `--before`,`--after` , and `--at` flags to filter deals by timestamp +Events command accept `--before`,`--after` , and `--at` flags to filter events by timestamp ```bash # examples -basin publication deals --publication demotest.data --at 1699569502 -basin publication deals --publication demotest.data --before 2023-11-09T19:38:23-03:00 -basin publication deals --publication demotest.data --after 2023-11-09 +vaults events --vault demotest.data --at 1699569502 +vaults events --vault demotest.data --before 2023-11-09T19:38:23-03:00 +vaults events --vault demotest.data --after 2023-11-09 ``` -## Retrieving +### Retrieving + +You can retrieve a file from a vault by running: ```bash -basin publication retrieve bafybeifr5njnrw67yyb2h2t7k6ukm3pml4fgphsxeurqcmgmeb7omc2vlq +vaults retrieve bafybeifr5njnrw67yyb2h2t7k6ukm3pml4fgphsxeurqcmgmeb7omc2vlq ``` -# Development +You can also specify where to save the file: -## Running +```bash +vaults retrieve --output /path/to/dir bafybeifr5njnrw67yyb2h2t7k6ukm3pml4fgphsxeurqcmgmeb7omc2vlq +``` + +Or stream the file to stdout the `-` value (note: the short form `-o` is for `--output`), and then pipe it to something like [`car extract`](https://github.com/ipld/go-car) to unpack the CAR file's contents: + +```bash +vaults retrieve -o - bafybeifr5njnrw67yyb2h2t7k6ukm3pml4fgphsxeurqcmgmeb7omc2vlq | car extract +``` + +## Development + +### Running You can make use of the scripts inside `scripts` to facilitate running the CLI locally without building. @@ -181,14 +206,14 @@ You can make use of the scripts inside `scripts` to facilitate running the CLI l # Starting the Provider Server PORT=8888 ./scripts/server.sh -# Create a wallet -./scripts/run.sh wallet create pk.out +# Create an account +./scripts/run.sh account create pk.out # Start replicating -./scripts/run.sh publication start --private-key [PRIVATE_KEY] namespace.relation_name +./scripts/run.sh vaults stream --private-key [PRIVATE_KEY] namespace.relation_name ``` -## Run tests +### Run tests ```bash make test @@ -196,14 +221,13 @@ make test Note: One of the tests requires Docker Engine to be running. - -# Contributing +## Contributing PRs accepted. Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. -# License +## License MIT AND Apache-2.0, © 2021-2023 Tableland Network Contributors diff --git a/cmd/basin/config.go b/cmd/basin/config.go deleted file mode 100644 index 63b4e61..0000000 --- a/cmd/basin/config.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "fmt" - "os" - "path" - - "github.com/mitchellh/go-homedir" - "gopkg.in/yaml.v3" -) - -// DefaultProviderHost is the address of Basin Provider. -const DefaultProviderHost = "https://basin.tableland.xyz" - -// DefaultWindowSize is the number of seconds for which WAL updates -// are buffered before being sent to the provider. -const DefaultWindowSize = 3600 - -type config struct { - Publications map[string]publication `yaml:"publications"` -} - -type publication struct { - User string `yaml:"user"` - Password string `yaml:"password"` - Host string `yaml:"host"` - Port int `yaml:"port"` - Database string `yaml:"database"` - ProviderHost string `yaml:"provider_host"` - WindowSize int64 `yaml:"window_size"` -} - -func newConfig() *config { - return &config{ - Publications: make(map[string]publication), - } -} - -func loadConfig(path string) (*config, error) { - buf, err := os.ReadFile(path) - if err != nil { - return &config{}, err - } - - conf := newConfig() - if err := yaml.Unmarshal(buf, conf); err != nil { - return &config{}, err - } - - return conf, nil -} - -func defaultConfigLocation(dir string) (string, error) { - if dir == "" { - // the default directory is home - var err error - dir, err = homedir.Dir() - if err != nil { - return "", fmt.Errorf("home dir: %s", err) - } - - dir = path.Join(dir, ".basin") - } - - _, err := os.Stat(dir) - if os.IsNotExist(err) { - if err := os.Mkdir(dir, 0o755); err != nil { - return "", fmt.Errorf("mkdir: %s", err) - } - } else if err != nil { - return "", fmt.Errorf("is not exist: %s", err) - } - - return dir, nil -} diff --git a/cmd/basin/main.go b/cmd/basin/main.go deleted file mode 100644 index e00abe4..0000000 --- a/cmd/basin/main.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "os" - - "github.com/urfave/cli/v2" - "golang.org/x/exp/slog" -) - -func main() { - cliApp := &cli.App{ - Name: "basin", - Usage: "Continuously publish data from your database to the Tableland network.", - Commands: []*cli.Command{ - newPublicationCommand(), - newWalletCommand(), - }, - } - - if err := cliApp.Run(os.Args); err != nil { - slog.Error(err.Error()) - os.Exit(1) - } -} diff --git a/cmd/basin/publication_test.go b/cmd/basin/publication_test.go deleted file mode 100644 index 655cdbf..0000000 --- a/cmd/basin/publication_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package main - -import ( - "context" - "crypto/ecdsa" - "database/sql" - "encoding/hex" - "fmt" - "testing" - "time" - - "github.com/ethereum/go-ethereum/crypto" - _ "github.com/lib/pq" - "github.com/stretchr/testify/require" - "github.com/tablelandnetwork/basin-cli/test" - "github.com/urfave/cli/v2" -) - -func TestPublication(t *testing.T) { - // This is supposed to be the closest to an e2E test we get - // skipping this for now until I think of a way to assert the data that went to Basin Provider - t.Skip() - - t.Parallel() - - pool := test.GetDockerPool() - - db, resource, dburi := pool.RunPostgres() - - cliApp := &cli.App{ - Name: "basin", - Usage: "basin replicates your database as logs and store them in Filecoin", - Commands: []*cli.Command{ - newPublicationCommand(), - }, - } - - h, err := newHelper(cliApp, db, dburi, t.TempDir()) - require.NoError(t, err) - - h.CreateTable(t) - h.CreatePublication(t) - - go func() { - h.StartReplication(t) - }() - - time.Sleep(1 * time.Second) - - h.AddNewRecord(t) - h.AddNewRecord(t) - - pool.Purge(resource) -} - -type helper struct { - app *cli.App - db *sql.DB - dburi string - pk *ecdsa.PrivateKey - ns string - rel string - dir string -} - -func newHelper(app *cli.App, db *sql.DB, dburi string, dir string) (*helper, error) { - pk, err := crypto.GenerateKey() - if err != nil { - return nil, err - } - - return &helper{ - pk: pk, - ns: "n", - rel: "t", - dir: dir, - - app: app, - db: db, - dburi: dburi, - }, nil -} - -func (h *helper) CreateTable(t *testing.T) { - _, err := h.db.ExecContext( - context.Background(), - fmt.Sprintf("create table %s(id serial primary key, name text);", h.rel), - ) - require.NoError(t, err) -} - -func (h *helper) CreatePublication(t *testing.T) { - name := fmt.Sprintf("%s.%s", h.ns, h.rel) - err := h.app.Run([]string{ - "basin", - "publication", - "--dir", h.dir, - "create", - "--dburi", h.dburi, - "--address", crypto.PubkeyToAddress(h.pk.PublicKey).Hex(), - "--provider", "mock", - name, - }) - require.NoError(t, err) -} - -func (h *helper) StartReplication(t *testing.T) { - ctx := context.Background() - name := fmt.Sprintf("%s.%s", h.ns, h.rel) - err := h.app.RunContext(ctx, []string{ - "basin", - "publication", - "--dir", h.dir, - "start", - "--private-key", hex.EncodeToString(crypto.FromECDSA(h.pk)), - name, - }) - require.NoError(t, err) -} - -func (h *helper) AddNewRecord(t *testing.T) { - _, err := h.db.ExecContext(context.Background(), fmt.Sprintf("insert into %s (name) values ('foo');", h.rel)) - require.NoError(t, err) -} diff --git a/cmd/basin/wallet.go b/cmd/basin/wallet.go deleted file mode 100644 index da70df1..0000000 --- a/cmd/basin/wallet.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "crypto/ecdsa" - "errors" - "fmt" - "os" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" - "github.com/urfave/cli/v2" -) - -func newWalletCommand() *cli.Command { - return &cli.Command{ - Name: "wallet", - Usage: "wallet commands", - Subcommands: []*cli.Command{ - { - Name: "create", - Usage: "creates a new wallet", - Action: func(cCtx *cli.Context) error { - filename := cCtx.Args().Get(0) - if filename == "" { - return errors.New("filename is empty") - } - - privateKey, err := crypto.GenerateKey() - if err != nil { - return fmt.Errorf("generate key: %s", err) - } - privateKeyBytes := crypto.FromECDSA(privateKey) - - if err := os.WriteFile(filename, []byte(hexutil.Encode(privateKeyBytes)[2:]), 0o644); err != nil { - return fmt.Errorf("writing to file %s: %s", filename, err) - } - pubk, _ := privateKey.Public().(*ecdsa.PublicKey) - publicKey := common.HexToAddress(crypto.PubkeyToAddress(*pubk).Hex()) - - fmt.Printf("Wallet address %s created\n", publicKey) - fmt.Printf("Private key saved in %s\n", filename) - return nil - }, - }, - { - Name: "pubkey", - Usage: "print the public key for a private key", - Action: func(cCtx *cli.Context) error { - filename := cCtx.Args().Get(0) - if filename == "" { - return errors.New("filename is empty") - } - - privateKey, err := crypto.LoadECDSA(filename) - if err != nil { - return fmt.Errorf("loading key: %s", err) - } - - pubk, _ := privateKey.Public().(*ecdsa.PublicKey) - publicKey := common.HexToAddress(crypto.PubkeyToAddress(*pubk).Hex()) - - fmt.Println(publicKey) - return nil - }, - }, - }, - } -} diff --git a/cmd/basin/publication.go b/cmd/vaults/commands.go similarity index 55% rename from cmd/basin/publication.go rename to cmd/vaults/commands.go index 6cfacfc..d2354fb 100644 --- a/cmd/basin/publication.go +++ b/cmd/vaults/commands.go @@ -2,9 +2,11 @@ package main import ( "context" + "crypto/ecdsa" "encoding/json" "errors" "fmt" + "io" "os" "path" "regexp" @@ -12,6 +14,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/filecoin-project/lassie/pkg/lassie" "github.com/filecoin-project/lassie/pkg/storage" @@ -25,94 +28,87 @@ import ( "github.com/olekukonko/tablewriter" "github.com/schollz/progressbar/v3" "github.com/tablelandnetwork/basin-cli/internal/app" - "github.com/tablelandnetwork/basin-cli/pkg/basinprovider" "github.com/tablelandnetwork/basin-cli/pkg/pgrepl" + "github.com/tablelandnetwork/basin-cli/pkg/vaultsprovider" "github.com/urfave/cli/v2" "gopkg.in/yaml.v3" ) -var pubNameRx = regexp.MustCompile(`^([a-zA-Z_][a-zA-Z0-9_]*)[.]([a-zA-Z_][a-zA-Z0-9_]*$)`) +var vaultNameRx = regexp.MustCompile(`^([a-zA-Z_][a-zA-Z0-9_]*)[.]([a-zA-Z_][a-zA-Z0-9_]*$)`) -func newPublicationCommand() *cli.Command { - return &cli.Command{ - Name: "publication", - Usage: "publication commands", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "dir", - Usage: "The directory where config will be stored (default: $HOME)", - }, - }, - Subcommands: []*cli.Command{ - newPublicationCreateCommand(), - newPublicationStartCommand(), - newPublicationUploadCommand(), - newPublicationListCommand(), - newPublicationDealsCommand(), - newPublicationRetrieveCommand(), - }, - } -} - -func newPublicationCreateCommand() *cli.Command { - var owner, dburi, provider string +func newVaultCreateCommand() *cli.Command { + var address, dburi, provider string var winSize, cache int64 return &cli.Command{ - Name: "create", - Usage: "create a new publication", + Name: "create", + Usage: "Create a new vault", + ArgsUsage: "", + Description: "Create a vault for a given account's address as either database streaming \n" + + "or file uploading. Optionally, also set a cache duration for the data.\n\nEXAMPLE:\n\n" + + "vaults create --account 0x1234abcd --cache 10 my.vault", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "address", + Name: "account", + Aliases: []string{"a"}, + Category: "REQUIRED:", Usage: "Ethereum wallet address", - Destination: &owner, + Destination: &address, Required: true, }, - &cli.StringFlag{ - Name: "dburi", - Usage: "PostgreSQL connection string", - Destination: &dburi, - }, &cli.StringFlag{ Name: "provider", - Usage: "The provider's address and port (e.g. localhost:8080)", + Aliases: []string{"p"}, + Category: "OPTIONAL:", + Usage: "The provider's address and port (e.g., localhost:8080)", + DefaultText: DefaultProviderHost, Destination: &provider, Value: DefaultProviderHost, }, - &cli.Int64Flag{ - Name: "window-size", - Usage: "Number of seconds for which WAL updates are buffered before being sent to the provider", - Destination: &winSize, - Value: DefaultWindowSize, - }, &cli.Int64Flag{ Name: "cache", + Category: "OPTIONAL:", Usage: "Time duration (in minutes) that the data will be available in the cache", + DefaultText: "0", Destination: &cache, Value: 0, }, + &cli.StringFlag{ + Name: "dburi", + Category: "OPTIONAL:", + Usage: "PostgreSQL connection string (e.g., postgresql://postgres:[PASSWORD]@[HOST]:[PORT]/postgres)", + Destination: &dburi, + }, + &cli.Int64Flag{ + Name: "window-size", + Category: "OPTIONAL:", + Usage: "Number of seconds for which WAL updates are buffered before being sent to the provider", + DefaultText: fmt.Sprintf("%d", DefaultWindowSize), + Destination: &winSize, + Value: DefaultWindowSize, + }, }, Action: func(cCtx *cli.Context) error { if cCtx.NArg() != 1 { - return errors.New("one argument should be provided") + return errors.New("must provide a vault name") } pub := cCtx.Args().First() - ns, rel, err := parsePublicationName(pub) + ns, rel, err := parseVaultName(pub) if err != nil { return err } - if !common.IsHexAddress(owner) { - return fmt.Errorf("%s is not a valid Ethereum wallet address", owner) + account, err := app.NewAccount(address) + if err != nil { + return fmt.Errorf("not a valid account: %s", err) } - pgConfig, err := pgconn.ParseConfig(dburi) if err != nil { return fmt.Errorf("parse config: %s", err) } - dir, err := defaultConfigLocation(cCtx.String("dir")) + dir, _, err := defaultConfigLocationV2(cCtx.String("dir")) if err != nil { return fmt.Errorf("default config location: %s", err) } @@ -125,12 +121,12 @@ func newPublicationCreateCommand() *cli.Command { _ = f.Close() }() - cfg, err := loadConfig(path.Join(dir, "config.yaml")) + cfg, err := loadConfigV2(path.Join(dir, "config.yaml")) if err != nil { return fmt.Errorf("load config: %s", err) } - cfg.Publications[pub] = publication{ + cfg.Vaults[pub] = vault{ Host: pgConfig.Host, Port: int(pgConfig.Port), User: pgConfig.User, @@ -144,13 +140,13 @@ func newPublicationCreateCommand() *cli.Command { return fmt.Errorf("encode: %s", err) } - exists, err := createPublication(cCtx.Context, dburi, ns, rel, provider, owner, cache) + exists, err := createVault(cCtx.Context, dburi, ns, rel, provider, account, cache) if err != nil { - return fmt.Errorf("failed to create publication: %s", err) + return fmt.Errorf("failed to create vault: %s", err) } if exists { - fmt.Printf("Publication %s.%s already exists.\n\n", ns, rel) + fmt.Printf("Vault %s.%s already exists.\n\n", ns, rel) return nil } @@ -158,21 +154,28 @@ func newPublicationCreateCommand() *cli.Command { return fmt.Errorf("mk db dir: %s", err) } - fmt.Printf("\033[32mPublication %s.%s created.\033[0m\n\n", ns, rel) + fmt.Printf("\033[32mVault %s.%s created.\033[0m\n\n", ns, rel) return nil }, } } -func newPublicationStartCommand() *cli.Command { +func newStreamCommand() *cli.Command { var privateKey string return &cli.Command{ - Name: "start", - Usage: "start a daemon process that replicates Postgres changes to Basin server", + Name: "stream", + Usage: "Starts a daemon process that streams Postgres changes to a vault", + ArgsUsage: "", + Description: "The daemon will continuously stream database changes (except deletions) \n" + + "to the vault, as long as the daemon is actively running.\n\n" + + "EXAMPLE:\n\nvaults stream --vault my.vault --private-key 0x1234abcd", + Flags: []cli.Flag{ &cli.StringFlag{ Name: "private-key", + Aliases: []string{"k"}, + Category: "REQUIRED:", Usage: "Ethereum wallet private key", Destination: &privateKey, Required: true, @@ -180,31 +183,31 @@ func newPublicationStartCommand() *cli.Command { }, Action: func(cCtx *cli.Context) error { if cCtx.NArg() != 1 { - return errors.New("one argument should be provided") + return errors.New("must provide a vault name") } - publication := cCtx.Args().First() - ns, rel, err := parsePublicationName(publication) + vault := cCtx.Args().First() + ns, rel, err := parseVaultName(vault) if err != nil { return err } - dir, err := defaultConfigLocation(cCtx.String("dir")) + dir, _, err := defaultConfigLocationV2(cCtx.String("dir")) if err != nil { return fmt.Errorf("default config location: %s", err) } - cfg, err := loadConfig(path.Join(dir, "config.yaml")) + cfg, err := loadConfigV2(path.Join(dir, "config.yaml")) if err != nil { return fmt.Errorf("load config: %s", err) } connString := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", - cfg.Publications[publication].User, - cfg.Publications[publication].Password, - cfg.Publications[publication].Host, - cfg.Publications[publication].Port, - cfg.Publications[publication].Database, + cfg.Vaults[vault].User, + cfg.Vaults[vault].Password, + cfg.Vaults[vault].Host, + cfg.Vaults[vault].Port, + cfg.Vaults[vault].Database, ) r, err := pgrepl.New(connString, pgrepl.Publication(rel)) @@ -217,7 +220,7 @@ func newPublicationStartCommand() *cli.Command { return err } - bp := basinprovider.New(cfg.Publications[publication].ProviderHost) + bp := vaultsprovider.New(cfg.Vaults[vault].ProviderHost) pgxConn, err := pgx.Connect(cCtx.Context, connString) if err != nil { @@ -243,9 +246,9 @@ func newPublicationStartCommand() *cli.Command { } // Creates a new db manager when replication starts - dbDir := path.Join(dir, publication) - winSize := time.Duration(cfg.Publications[publication].WindowSize) * time.Second - uploader := app.NewBasinUploader(ns, rel, bp, privateKey) + dbDir := path.Join(dir, vault) + winSize := time.Duration(cfg.Vaults[vault].WindowSize) * time.Second + uploader := app.NewVaultsUploader(ns, rel, bp, privateKey) dbm := app.NewDBManager(dbDir, rel, cols, winSize, uploader) // Before starting replication, upload the remaining data @@ -253,8 +256,8 @@ func newPublicationStartCommand() *cli.Command { return fmt.Errorf("upload all: %s", err) } - basinStreamer := app.NewBasinStreamer(ns, r, dbm) - if err := basinStreamer.Run(cCtx.Context); err != nil { + vaultsStreamer := app.NewVaultsStreamer(ns, r, dbm) + if err := vaultsStreamer.Run(cCtx.Context); err != nil { return fmt.Errorf("run: %s", err) } @@ -263,37 +266,47 @@ func newPublicationStartCommand() *cli.Command { } } -func newPublicationUploadCommand() *cli.Command { - var privateKey, publicationName string +func newWriteCommand() *cli.Command { + var privateKey, vaultName string var timestamp string return &cli.Command{ - Name: "upload", - Usage: "upload a Parquet file", + Name: "write", + Usage: "Write a Parquet file", + ArgsUsage: "", + Description: "A Parquet file can be pushed directly to the vault, as an \n" + + "alternative to continuous Postgres data streaming.\n\n" + + "EXAMPLE:\n\nvaults write --vault my.vault --private-key 0x1234abcd /path/to/file.parquet", Flags: []cli.Flag{ &cli.StringFlag{ Name: "private-key", + Aliases: []string{"k"}, + Category: "REQUIRED:", Usage: "Ethereum wallet private key", Destination: &privateKey, Required: true, }, &cli.StringFlag{ - Name: "name", - Usage: "Publication name", - Destination: &publicationName, + Name: "vault", + Aliases: []string{"v"}, + Category: "REQUIRED:", + Usage: "Vault name", + Destination: &vaultName, Required: true, }, &cli.StringFlag{ Name: "timestamp", - Usage: "The time the file was created (default: current epoch in UTC)", + Category: "OPTIONAL:", + Usage: "The time the file was created", + DefaultText: "current epoch in UTC", Destination: ×tamp, }, }, Action: func(cCtx *cli.Context) error { if cCtx.NArg() != 1 { - return errors.New("one argument should be provided") + return errors.New("must provide a file path") } - ns, rel, err := parsePublicationName(publicationName) + ns, rel, err := parseVaultName(vaultName) if err != nil { return err } @@ -303,17 +316,17 @@ func newPublicationUploadCommand() *cli.Command { return err } - dir, err := defaultConfigLocation(cCtx.String("dir")) + dir, _, err := defaultConfigLocationV2(cCtx.String("dir")) if err != nil { return fmt.Errorf("default config location: %s", err) } - cfg, err := loadConfig(path.Join(dir, "config.yaml")) + cfg, err := loadConfigV2(path.Join(dir, "config.yaml")) if err != nil { return fmt.Errorf("load config: %s", err) } - bp := basinprovider.New(cfg.Publications[publicationName].ProviderHost) + bp := vaultsprovider.New(cfg.Vaults[vaultName].ProviderHost) filepath := cCtx.Args().First() @@ -332,7 +345,7 @@ func newPublicationUploadCommand() *cli.Command { bar := progressbar.DefaultBytes( fi.Size(), - "Uploading file...", + "Writing...", ) if timestamp == "" { @@ -344,8 +357,8 @@ func newPublicationUploadCommand() *cli.Command { return err } - basinStreamer := app.NewBasinUploader(ns, rel, bp, privateKey) - if err := basinStreamer.Upload(cCtx.Context, filepath, bar, ts, fi.Size()); err != nil { + vaultsStreamer := app.NewVaultsUploader(ns, rel, bp, privateKey) + if err := vaultsStreamer.Upload(cCtx.Context, filepath, bar, ts, fi.Size()); err != nil { return fmt.Errorf("upload: %s", err) } @@ -354,40 +367,66 @@ func newPublicationUploadCommand() *cli.Command { } } -func newPublicationListCommand() *cli.Command { - var owner, provider string +func newListCommand() *cli.Command { + var address, provider, format string return &cli.Command{ Name: "list", - Usage: "list publications of a given address", + Usage: "List vaults of a given account", + Description: "Listing vaults will show all vaults that have been created by the provided \n" + + "account's address and logged as either line delimited text or a json array.\n\n" + + "EXAMPLE:\n\nvaults list --account 0x1234abcd --format json", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "address", + Name: "account", + Aliases: []string{"a"}, + Category: "REQUIRED:", Usage: "Ethereum wallet address", - Destination: &owner, + Destination: &address, Required: true, }, &cli.StringFlag{ Name: "provider", - Usage: "The provider's address and port (e.g. localhost:8080)", + Aliases: []string{"p"}, + Category: "OPTIONAL:", + Usage: "The provider's address and port (e.g., localhost:8080)", + DefaultText: DefaultProviderHost, Destination: &provider, Value: DefaultProviderHost, }, + &cli.StringFlag{ + Name: "format", + Category: "OPTIONAL:", + Usage: "The output format (text or json)", + DefaultText: "text", + Destination: &format, + Value: "text", + }, }, Action: func(cCtx *cli.Context) error { - account, err := app.NewAccount(owner) + account, err := app.NewAccount(address) if err != nil { - return fmt.Errorf("%s is not a valid Ethereum wallet address", owner) + return fmt.Errorf("%s is not a valid Ethereum wallet address", address) } - bp := basinprovider.New(provider) + bp := vaultsprovider.New(provider) vaults, err := bp.ListVaults(cCtx.Context, app.ListVaultsParams{Account: account}) if err != nil { - return fmt.Errorf("failed to list publications: %s", err) + return fmt.Errorf("failed to list vaults: %s", err) } - for _, vault := range vaults { - fmt.Printf("%s\n", vault) + if format == "text" { + for _, vault := range vaults { + fmt.Printf("%s\n", vault) + } + } else if format == "json" { + jsonData, err := json.Marshal(vaults) + if err != nil { + return fmt.Errorf("error serializing events to JSON") + } + fmt.Println(string(jsonData)) + } else { + return fmt.Errorf("invalid format: %s", format) } return nil @@ -395,75 +434,97 @@ func newPublicationListCommand() *cli.Command { } } -func newPublicationDealsCommand() *cli.Command { - var publication, provider, before, after, at, format string +func newListEventsCommand() *cli.Command { + var vault, provider, before, after, at, format string var limit, offset, latest int return &cli.Command{ - Name: "deals", - Usage: "list deals of a given publications", + Name: "events", + Usage: "List events of a given vault", + UsageText: "vaults events [command options]", + Description: "Vault events can be filtered by date ranges (unix, ISO 8601 date,\n" + + "or ISO 8601 date & time), returning the event metadata and \n" + + "corresponding CID.\n\n" + + "EXAMPLE:\n\nvaults events --vault my.vault \\\n" + + "--limit 10 --offset 3 \\\n--after 2023-09-01 --before 2023-12-01 \\\n" + + "--format json", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "publication", - Usage: "Publication name", - Destination: &publication, + Name: "vault", + Aliases: []string{"v"}, + Category: "REQUIRED:", + Usage: "Vault name", + Destination: &vault, Required: true, }, &cli.StringFlag{ Name: "provider", - Usage: "The provider's address and port (e.g. localhost:8080)", + Aliases: []string{"p"}, + Category: "OPTIONAL:", + Usage: "The provider's address and port (e.g., localhost:8080)", + DefaultText: DefaultProviderHost, Destination: &provider, Value: DefaultProviderHost, }, &cli.IntFlag{ Name: "limit", + Category: "OPTIONAL:", Usage: "The number of deals to fetch", + DefaultText: "10", Destination: &limit, Value: 10, }, &cli.IntFlag{ Name: "latest", + Category: "OPTIONAL:", Usage: "The latest N deals to fetch", Destination: &latest, }, &cli.IntFlag{ Name: "offset", + Category: "OPTIONAL:", Usage: "The epoch to start from", + DefaultText: "0", Destination: &offset, Value: 0, }, &cli.StringFlag{ Name: "before", + Category: "OPTIONAL:", Usage: "Filter deals created before this timestamp", Destination: &before, Value: "", }, &cli.StringFlag{ Name: "after", + Category: "OPTIONAL:", Usage: "Filter deals created after this timestamp", Destination: &after, Value: "", }, &cli.StringFlag{ Name: "at", + Category: "OPTIONAL:", Usage: "Filter deals created at this timestamp", Destination: &at, Value: "", }, &cli.StringFlag{ Name: "format", + Category: "OPTIONAL:", Usage: "The output format (table or json)", + DefaultText: "table", Destination: &format, Value: "table", }, }, Action: func(cCtx *cli.Context) error { - ns, rel, err := parsePublicationName(publication) + ns, rel, err := parseVaultName(vault) if err != nil { return err } - bp := basinprovider.New(provider) + bp := vaultsprovider.New(provider) b, a, err := validateBeforeAndAfter(before, after, at) if err != nil { @@ -534,19 +595,35 @@ func newPublicationDealsCommand() *cli.Command { } } -func newPublicationRetrieveCommand() *cli.Command { +func newRetrieveCommand() *cli.Command { + var output string + return &cli.Command{ - Name: "retrieve", - Usage: "Retrieve files by CID", + Name: "retrieve", + Usage: "Retrieve an event by CID", + ArgsUsage: "", + Description: "Retrieving an event will download the event's CAR file into the \n" + + "current directory, a provided directory path, or to stdout.\n\n" + + "EXAMPLE:\n\nvaults retrieve --output /path/to/dir bafy...", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Category: "OPTIONAL:", + Usage: "Output directory path, or '-' for stdout", + DefaultText: "current directory", + Destination: &output, + }, + }, Action: func(cCtx *cli.Context) error { arg := cCtx.Args().Get(0) if arg == "" { - return errors.New("argument is empty") + return errors.New("must provide an event CID") } rootCid, err := cid.Parse(arg) if err != nil { - return errors.New("cid is invalid") + return errors.New("CID is invalid") } lassie, err := lassie.NewLassie(cCtx.Context) @@ -559,7 +636,37 @@ func newPublicationRetrieveCommand() *cli.Command { car.StoreIdentityCIDs(false), car.UseWholeCIDs(false), } - carWriter := deferred.NewDeferredCarWriterForPath(fmt.Sprintf("./%s.car", arg), []cid.Cid{rootCid}, carOpts...) + + var carWriter *deferred.DeferredCarWriter + var tmpFile *os.File + + if output == "-" { + // Create a temporary file only for writing to stdout case + tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s.car", arg)) + if err != nil { + return fmt.Errorf("failed to create temporary file: %s", err) + } + defer func() { + _ = os.Remove(tmpFile.Name()) + }() + carWriter = deferred.NewDeferredCarWriterForPath(tmpFile.Name(), []cid.Cid{rootCid}, carOpts...) + } else { + // Write to the provided path or current directory + if output == "" { + output = "." // Default to current directory + } + // Ensure path is a valid directory + info, err := os.Stat(output) + if err != nil { + return fmt.Errorf("failed to access output directory: %s", err) + } + if !info.IsDir() { + return fmt.Errorf("output path is not a directory: %s", output) + } + carPath := path.Join(output, fmt.Sprintf("%s.car", arg)) + carWriter = deferred.NewDeferredCarWriterForPath(carPath, []cid.Cid{rootCid}, carOpts...) + } + defer func() { _ = carWriter.Close() }() @@ -579,15 +686,92 @@ func newPublicationRetrieveCommand() *cli.Command { return fmt.Errorf("failed to fetch: %s", err) } + // Write to stdout only if the output flag is set to '-' + if output == "-" && tmpFile != nil { + _, _ = tmpFile.Seek(0, io.SeekStart) + _, err = io.Copy(os.Stdout, tmpFile) + if err != nil { + return fmt.Errorf("failed to write to stdout: %s", err) + } + } + return nil }, } } -func parsePublicationName(name string) (ns string, rel string, err error) { - match := pubNameRx.FindStringSubmatch(name) +func newWalletCommand() *cli.Command { + return &cli.Command{ + Name: "account", + Usage: "Account management for an Ethereum-style wallet", + UsageText: "vaults account [arguments...]", + Subcommands: []*cli.Command{ + { + Name: "create", + Usage: "Creates a new account", + UsageText: "vaults account create ", + Description: "Create an Ethereum-style wallet (secp256k1 key pair) at a \n" + + "provided file path.\n\n" + + "EXAMPLE:\n\nvaults account create /path/to/file", + Action: func(cCtx *cli.Context) error { + filename := cCtx.Args().Get(0) + if filename == "" { + return errors.New("filename is empty") + } + + privateKey, err := crypto.GenerateKey() + if err != nil { + return fmt.Errorf("generate key: %s", err) + } + privateKeyBytes := crypto.FromECDSA(privateKey) + + if err := os.WriteFile(filename, []byte(hexutil.Encode(privateKeyBytes)[2:]), 0o644); err != nil { + return fmt.Errorf("writing to file %s: %s", filename, err) + } + pubk, _ := privateKey.Public().(*ecdsa.PublicKey) + publicKey := common.HexToAddress(crypto.PubkeyToAddress(*pubk).Hex()) + + fmt.Printf("Wallet address %s created\n", publicKey) + fmt.Printf("Private key saved in %s\n", filename) + return nil + }, + }, + { + Name: "address", + Usage: "Print the public key for an account's private key", + UsageText: "vaults account address ", + Description: "The result of the `vaults account create` command will write a private key to a file, \n" + + "and this lets you retrieve the public key value for use in other commands.\n\n" + + "EXAMPLE:\n\nvaults account address /path/to/file", + Action: func(cCtx *cli.Context) error { + filename := cCtx.Args().Get(0) + if filename == "" { + return errors.New("filename is empty") + } + + privateKey, err := crypto.LoadECDSA(filename) + if err != nil { + return fmt.Errorf("loading key: %s", err) + } + + pubk, _ := privateKey.Public().(*ecdsa.PublicKey) + publicKey := common.HexToAddress(crypto.PubkeyToAddress(*pubk).Hex()) + + fmt.Println(publicKey) + return nil + }, + }, + }, + } +} + +func parseVaultName(name string) (ns string, rel string, err error) { + match := vaultNameRx.FindStringSubmatch(name) if len(match) != 3 { - return "", "", errors.New("publication name must be of the form `namespace.relation_name` using only letters, numbers, and underscores (_), where `namespace` and `relation` do not start with a number") // nolint + return "", "", errors.New( + "vault name must be of the form `namespace.relation_name` using only letters, numbers, " + + "and underscores (_), where `namespace` and `relation` do not start with a number", + ) // nolint } ns = match[1] rel = match[2] @@ -651,21 +835,16 @@ func inspectTable(ctx context.Context, tx pgx.Tx, rel string) ([]app.Column, err return columns, nil } -func createPublication( +func createVault( ctx context.Context, dburi string, ns string, rel string, provider string, - owner string, + account *app.Account, cacheDuration int64, ) (exists bool, err error) { - account, err := app.NewAccount(owner) - if err != nil { - return false, fmt.Errorf("not a valid account: %s", err) - } - - bp := basinprovider.New(provider) + bp := vaultsprovider.New(provider) req := app.CreateVaultParams{ Account: account, Vault: app.Vault(fmt.Sprintf("%s.%s", ns, rel)), diff --git a/cmd/vaults/config.go b/cmd/vaults/config.go new file mode 100644 index 0000000..5c481a2 --- /dev/null +++ b/cmd/vaults/config.go @@ -0,0 +1,190 @@ +package main + +import ( + "errors" + "fmt" + "os" + "path" + + "github.com/mitchellh/go-homedir" + "golang.org/x/exp/slog" + "gopkg.in/yaml.v3" +) + +// DefaultProviderHost is the address of Vaults Provider. +const DefaultProviderHost = "https://basin.tableland.xyz" + +// DefaultWindowSize is the number of seconds for which WAL updates +// are buffered before being sent to the provider. +const DefaultWindowSize = 3600 + +type config struct { + Publications map[string]publication `yaml:"publications"` +} + +type configV2 struct { + Vaults map[string]vault `yaml:"vaults"` +} + +type publication struct { + User string `yaml:"user"` + Password string `yaml:"password"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + ProviderHost string `yaml:"provider_host"` + WindowSize int64 `yaml:"window_size"` +} + +type vault struct { + User string `yaml:"user"` + Password string `yaml:"password"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + ProviderHost string `yaml:"provider_host"` + WindowSize int64 `yaml:"window_size"` +} + +func newConfig() *config { + return &config{ + Publications: make(map[string]publication), + } +} + +func newConfigV2() *configV2 { + return &configV2{ + Vaults: make(map[string]vault), + } +} + +func loadConfig(path string) (*config, error) { + buf, err := os.ReadFile(path) + if err != nil { + return &config{}, err + } + + conf := newConfig() + if err := yaml.Unmarshal(buf, conf); err != nil { + return &config{}, err + } + + return conf, nil +} + +func loadConfigV2(path string) (*configV2, error) { + buf, err := os.ReadFile(path) + if err != nil { + return &configV2{}, err + } + + conf := newConfigV2() + if err := yaml.Unmarshal(buf, conf); err != nil { + return &configV2{}, err + } + + return conf, nil +} + +func defaultConfigLocation(dir string) (string, error) { + if dir == "" { + // the default directory is home + var err error + dir, err = homedir.Dir() + if err != nil { + return "", fmt.Errorf("home dir: %s", err) + } + + dir = path.Join(dir, ".basin") + } + + _, err := os.Stat(dir) + if os.IsNotExist(err) { + if err := os.Mkdir(dir, 0o755); err != nil { + return "", fmt.Errorf("mkdir: %s", err) + } + } else if err != nil { + return "", fmt.Errorf("is not exist: %s", err) + } + + return dir, nil +} + +func defaultConfigLocationV2(dir string) (string, bool, error) { + if dir == "" { + // the default directory is home + var err error + dir, err = homedir.Dir() + if err != nil { + return "", false, fmt.Errorf("home dir: %s", err) + } + + dir = path.Join(dir, ".vaults") + } + + _, err := os.Stat(dir) + doesNotExist := os.IsNotExist(err) + if doesNotExist { + if err := os.Mkdir(dir, 0o755); err != nil { + return "", doesNotExist, fmt.Errorf("mkdir: %s", err) + } + } else if err != nil { + return "", doesNotExist, fmt.Errorf("is not exist: %s", err) + } + + return dir, !doesNotExist, nil +} + +func migrateConfigV1ToV2() { + dirV1, err := defaultConfigLocation("") + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + dirV2, exists, err := defaultConfigLocationV2("") + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + // create v2 config.yaml if necessary + f, err := os.OpenFile(path.Join(dirV2, "config.yaml"), os.O_RDWR|os.O_CREATE, 0o666) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + defer func() { + _ = f.Close() + }() + + if exists { + return + } + + // .basin/config.yaml does not exist, there's nothing to migrate + if _, err := os.Stat(path.Join(dirV1, "config.yaml")); errors.Is(err, os.ErrNotExist) { + return + } + + cfgV1, err := loadConfig(path.Join(dirV1, "config.yaml")) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + cfgV2, err := loadConfigV2(path.Join(dirV2, "config.yaml")) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + for name, item := range cfgV1.Publications { + cfgV2.Vaults[name] = vault(item) + } + + if err := yaml.NewEncoder(f).Encode(cfgV2); err != nil { + slog.Error(err.Error()) + os.Exit(1) + } +} diff --git a/cmd/vaults/main.go b/cmd/vaults/main.go new file mode 100644 index 0000000..e4dc078 --- /dev/null +++ b/cmd/vaults/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "os" + + "github.com/urfave/cli/v2" + "golang.org/x/exp/slog" +) + +func init() { + // Enforce uppercase version shorthand flag + cli.VersionFlag = &cli.BoolFlag{ + Name: "version", + Aliases: []string{"V"}, + Usage: "show version", + } + cli.VersionPrinter = func(c *cli.Context) { + fmt.Printf("%s\n", c.App.Version) + } +} + +var version = "dev" + +func main() { + // migrate v1 config to v2 config + migrateConfigV1ToV2() + + cliApp := &cli.App{ + Name: "vaults", + Usage: "Continuously publish data from your database or file uploads to the Textile Vaults network.", + Version: version, + Commands: []*cli.Command{ + newVaultCreateCommand(), + newStreamCommand(), + newWriteCommand(), + newListCommand(), + newListEventsCommand(), + newRetrieveCommand(), + newWalletCommand(), + }, + } + + if err := cliApp.Run(os.Args); err != nil { + slog.Error(err.Error()) + os.Exit(1) + } +} diff --git a/goreleaser.yml b/goreleaser.yml index 8131285..67f57b7 100644 --- a/goreleaser.yml +++ b/goreleaser.yml @@ -6,12 +6,12 @@ before: env: - CGO_ENABLED=1 -project_name: basin +project_name: vaults builds: -- id: basin-darwin-amd64 - binary: basin - main: ./cmd/basin +- id: vaults-darwin-amd64 + binary: vaults + main: ./cmd/vaults goarch: - amd64 goos: @@ -23,9 +23,9 @@ builds: - -trimpath ldflags: - -s -w -X main.version={{.Version}} -X main.commit={{.FullCommit}} -X main.date={{.CommitDate}} -- id: basin-darwin-arm64 - binary: basin - main: ./cmd/basin +- id: vaults-darwin-arm64 + binary: vaults + main: ./cmd/vaults goarch: - arm64 goos: @@ -37,9 +37,9 @@ builds: - -trimpath ldflags: - -s -w -X main.version={{.Version}} -X main.commit={{.FullCommit}} -X main.date={{.CommitDate}} -- id: basin-linux-amd64 - binary: basin - main: ./cmd/basin +- id: vaults-linux-amd64 + binary: vaults + main: ./cmd/vaults goarch: - amd64 goos: @@ -51,9 +51,9 @@ builds: - -trimpath ldflags: - -s -w -X main.version={{.Version}} -X main.commit={{.FullCommit}} -X main.date={{.CommitDate}} -- id: basin-linux-arm64 - binary: basin - main: ./cmd/basin +- id: vaults-linux-arm64 + binary: vaults + main: ./cmd/vaults goarch: - arm64 goos: @@ -67,15 +67,15 @@ builds: - -s -w -X main.version={{.Version}} -X main.commit={{.FullCommit}} -X main.date={{.CommitDate}} archives: - - id: basin-archive + - id: vaults-archive format: tar.gz files: - none* builds: - - basin-darwin-amd64 - - basin-darwin-arm64 - - basin-linux-amd64 - - basin-linux-arm64 + - vaults-darwin-amd64 + - vaults-darwin-arm64 + - vaults-linux-amd64 + - vaults-linux-arm64 name_template: "{{ .ProjectName }}-{{ .Os }}-{{ .Arch }}" checksum: diff --git a/internal/app/basin_provider.go b/internal/app/basin_provider.go index 8dec2b4..dd6b6f5 100644 --- a/internal/app/basin_provider.go +++ b/internal/app/basin_provider.go @@ -5,8 +5,8 @@ import ( "io" ) -// BasinProvider ... -type BasinProvider interface { +// VaultsProvider defines Vaults API. +type VaultsProvider interface { CreateVault(context.Context, CreateVaultParams) error ListVaults(context.Context, ListVaultsParams) ([]Vault, error) ListVaultEvents(context.Context, ListVaultEventsParams) ([]EventInfo, error) diff --git a/internal/app/db.go b/internal/app/db.go index 88242cf..504509f 100644 --- a/internal/app/db.go +++ b/internal/app/db.go @@ -32,11 +32,11 @@ type DBManager struct { createdAT time.Time cols []Column winSize time.Duration - uploader *BasinUploader + uploader *VaultsUploader } // NewDBManager creates a new DBManager. -func NewDBManager(dbDir, table string, cols []Column, winSize time.Duration, uploader *BasinUploader) *DBManager { +func NewDBManager(dbDir, table string, cols []Column, winSize time.Duration, uploader *VaultsUploader) *DBManager { return &DBManager{ dbDir: dbDir, table: table, diff --git a/internal/app/streamer.go b/internal/app/streamer.go index ef8fbab..b7bdf13 100644 --- a/internal/app/streamer.go +++ b/internal/app/streamer.go @@ -19,24 +19,24 @@ type Replicator interface { Shutdown() } -// BasinStreamer contains logic of streaming Postgres changes to Basin Provider. -type BasinStreamer struct { +// VaultsStreamer contains logic of streaming Postgres changes to Vaults Provider. +type VaultsStreamer struct { namespace string replicator Replicator dbMngr *DBManager } -// NewBasinStreamer creates new streamer. -func NewBasinStreamer(ns string, r Replicator, dbm *DBManager) *BasinStreamer { - return &BasinStreamer{ +// NewVaultsStreamer creates new streamer. +func NewVaultsStreamer(ns string, r Replicator, dbm *DBManager) *VaultsStreamer { + return &VaultsStreamer{ namespace: ns, replicator: r, dbMngr: dbm, } } -// Run runs the BasinStreamer logic. -func (b *BasinStreamer) Run(ctx context.Context) error { +// Run runs the VaultsStreamer logic. +func (b *VaultsStreamer) Run(ctx context.Context) error { // Open a local DB for replaying txs if err := b.dbMngr.NewDB(ctx); err != nil { return err diff --git a/internal/app/streamer_test.go b/internal/app/streamer_test.go index 281180e..fa09b5c 100644 --- a/internal/app/streamer_test.go +++ b/internal/app/streamer_test.go @@ -29,7 +29,7 @@ var cols = []Column{ // Test when window threshold is crossed before // second Tx is received: . -func TestBasinStreamerOne(t *testing.T) { +func TestVaultsStreamerOne(t *testing.T) { // used for testing privateKey, err := crypto.HexToECDSA(pk) require.NoError(t, err) @@ -38,15 +38,15 @@ func TestBasinStreamerOne(t *testing.T) { feed := make(chan *pgrepl.Tx) testDBDir := t.TempDir() winSize := 3 * time.Second - providerMock := &basinProviderMock{ + providerMock := &vaultsProviderMock{ owner: make(map[string]string), uploaderInputs: make(chan *os.File), } - uploader := NewBasinUploader(testNS, testTable, providerMock, privateKey) + uploader := NewVaultsUploader(testNS, testTable, providerMock, privateKey) dbm := NewDBManager( testDBDir, testTable, cols, winSize, uploader) - streamer := NewBasinStreamer(testNS, &replicatorMock{feed: feed}, dbm) + streamer := NewVaultsStreamer(testNS, &replicatorMock{feed: feed}, dbm) go func() { // start listening to WAL records in a separate goroutine err = streamer.Run(context.Background()) @@ -105,7 +105,7 @@ func TestBasinStreamerOne(t *testing.T) { dbm.db = nil dbm.dbFname = "" dbm.createdAT = time.Time{} - uploader.provider = &basinProviderMock{ + uploader.provider = &vaultsProviderMock{ owner: make(map[string]string), uploaderInputs: ch2, } @@ -132,7 +132,7 @@ func TestBasinStreamerOne(t *testing.T) { // Test when window threshold is crossed after // second Tx is received: . -func TestBasinStreamerTwo(t *testing.T) { +func TestVaultsStreamerTwo(t *testing.T) { privateKey, err := crypto.HexToECDSA(pk) require.NoError(t, err) @@ -140,14 +140,14 @@ func TestBasinStreamerTwo(t *testing.T) { feed := make(chan *pgrepl.Tx) testDBDir := t.TempDir() winSize := 3 * time.Second - providerMock := &basinProviderMock{ + providerMock := &vaultsProviderMock{ owner: make(map[string]string), uploaderInputs: make(chan *os.File), } - uploader := NewBasinUploader(testNS, testTable, providerMock, privateKey) + uploader := NewVaultsUploader(testNS, testTable, providerMock, privateKey) dbm := NewDBManager( testDBDir, testTable, cols, winSize, uploader) - streamer := NewBasinStreamer(testNS, &replicatorMock{feed: feed}, dbm) + streamer := NewVaultsStreamer(testNS, &replicatorMock{feed: feed}, dbm) go func() { // start listening to WAL records in a separate goroutine err = streamer.Run(context.Background()) @@ -234,29 +234,29 @@ func (rm *replicatorMock) Shutdown() { close(rm.feed) } -type basinProviderMock struct { +type vaultsProviderMock struct { owner map[string]string uploaderInputs chan *os.File } -func (bp *basinProviderMock) CreateVault( +func (bp *vaultsProviderMock) CreateVault( _ context.Context, params CreateVaultParams, ) error { bp.owner[string(params.Vault)] = params.Account.Hex() return nil } -func (bp *basinProviderMock) ListVaults(_ context.Context, _ ListVaultsParams) ([]Vault, error) { +func (bp *vaultsProviderMock) ListVaults(_ context.Context, _ ListVaultsParams) ([]Vault, error) { return []Vault{}, nil } -func (bp *basinProviderMock) ListVaultEvents( +func (bp *vaultsProviderMock) ListVaultEvents( context.Context, ListVaultEventsParams, ) ([]EventInfo, error) { return []EventInfo{}, nil } -func (bp *basinProviderMock) WriteVaultEvent( +func (bp *vaultsProviderMock) WriteVaultEvent( _ context.Context, params WriteVaultEventParams, ) error { file := params.Content.(*os.File) diff --git a/internal/app/uploader.go b/internal/app/uploader.go index 8b573c4..13facc0 100644 --- a/internal/app/uploader.go +++ b/internal/app/uploader.go @@ -15,19 +15,19 @@ import ( "golang.org/x/crypto/sha3" ) -// BasinUploader contains logic of uploading Parquet files to Basin Provider. -type BasinUploader struct { +// VaultsUploader contains logic of uploading Parquet files to Vaults Provider. +type VaultsUploader struct { namespace string relation string privateKey *ecdsa.PrivateKey - provider BasinProvider + provider VaultsProvider } -// NewBasinUploader creates new uploader. -func NewBasinUploader( - ns string, rel string, bp BasinProvider, pk *ecdsa.PrivateKey, -) *BasinUploader { - return &BasinUploader{ +// NewVaultsUploader creates new uploader. +func NewVaultsUploader( + ns string, rel string, bp VaultsProvider, pk *ecdsa.PrivateKey, +) *VaultsUploader { + return &VaultsUploader{ namespace: ns, relation: rel, provider: bp, @@ -36,7 +36,7 @@ func NewBasinUploader( } // Upload sends file to provider for upload. -func (bu *BasinUploader) Upload( +func (bu *VaultsUploader) Upload( ctx context.Context, filepath string, progress io.Writer, ts Timestamp, sz int64, ) error { f, err := os.Open(filepath) diff --git a/pkg/basinprovider/provider.go b/pkg/vaultsprovider/provider.go similarity index 86% rename from pkg/basinprovider/provider.go rename to pkg/vaultsprovider/provider.go index 79034fa..4d50bb3 100644 --- a/pkg/basinprovider/provider.go +++ b/pkg/vaultsprovider/provider.go @@ -1,4 +1,4 @@ -package basinprovider +package vaultsprovider import ( "context" @@ -14,28 +14,28 @@ import ( "github.com/tablelandnetwork/basin-cli/internal/app" ) -// BasinProvider implements the app.BasinProvider interface. -type BasinProvider struct { +// VaultsProvider implements the app.VaultsProvider interface. +type VaultsProvider struct { provider string client *http.Client } -var _ app.BasinProvider = (*BasinProvider)(nil) +var _ app.VaultsProvider = (*VaultsProvider)(nil) -// New creates a new BasinProvider. -func New(provider string) *BasinProvider { +// New creates a new VaultsProvider. +func New(provider string) *VaultsProvider { client := &http.Client{ Timeout: 10 * time.Second, } - return &BasinProvider{ + return &VaultsProvider{ provider: provider, client: client, } } // CreateVault creates a vault. -func (bp *BasinProvider) CreateVault(ctx context.Context, params app.CreateVaultParams) error { +func (bp *VaultsProvider) CreateVault(ctx context.Context, params app.CreateVaultParams) error { form := url.Values{} form.Add("account", params.Account.Hex()) form.Add("cache", fmt.Sprint(params.CacheDuration)) @@ -63,7 +63,7 @@ func (bp *BasinProvider) CreateVault(ctx context.Context, params app.CreateVault } // ListVaults lists all vaults from a given account. -func (bp *BasinProvider) ListVaults( +func (bp *VaultsProvider) ListVaults( ctx context.Context, params app.ListVaultsParams, ) ([]app.Vault, error) { req, err := http.NewRequestWithContext( @@ -88,7 +88,7 @@ func (bp *BasinProvider) ListVaults( } // ListVaultEvents lists all events from a given vault. -func (bp *BasinProvider) ListVaultEvents( +func (bp *VaultsProvider) ListVaultEvents( ctx context.Context, params app.ListVaultEventsParams, ) ([]app.EventInfo, error) { req, err := http.NewRequestWithContext( @@ -120,7 +120,7 @@ func (bp *BasinProvider) ListVaultEvents( } // WriteVaultEvent write an event. -func (bp *BasinProvider) WriteVaultEvent(ctx context.Context, params app.WriteVaultEventParams) error { +func (bp *VaultsProvider) WriteVaultEvent(ctx context.Context, params app.WriteVaultEventParams) error { req, err := http.NewRequestWithContext( ctx, http.MethodPost, diff --git a/scripts/run.sh b/scripts/run.sh index afa075a..1653a64 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -1,3 +1,3 @@ #!/bin/sh -go run $( ls -1 cmd/basin/*.go | grep -v _test.go) $@ \ No newline at end of file +go run $( ls -1 cmd/vaults/*.go | grep -v _test.go) $@ \ No newline at end of file diff --git a/test/postgres.go b/test/postgres.go index 08f4a5b..0039537 100644 --- a/test/postgres.go +++ b/test/postgres.go @@ -55,7 +55,7 @@ func (dp *DockerPool) RunPostgres() (db *sql.DB, resource *dockertest.Resource, Env: []string{ "POSTGRES_PASSWORD=secret", "POSTGRES_USER=admin", - "POSTGRES_DB=basin", + "POSTGRES_DB=vaults", "listen_addresses = '*'", }, }, func(config *docker.HostConfig) { @@ -68,7 +68,7 @@ func (dp *DockerPool) RunPostgres() (db *sql.DB, resource *dockertest.Resource, _ = resource.Expire(120) // Tell docker to hard kill the container in 120 seconds - uri = fmt.Sprintf("postgres://admin:secret@%s/basin?sslmode=disable", resource.GetHostPort("5432/tcp")) + uri = fmt.Sprintf("postgres://admin:secret@%s/vaults?sslmode=disable", resource.GetHostPort("5432/tcp")) db, err = sql.Open("postgres", uri) if err != nil { log.Fatalf("Could not open the database: %s", err)