Skip to content

Commit

Permalink
removes RPC and make use of HTTP API
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
  • Loading branch information
brunocalza committed Dec 13, 2023
1 parent ce5595a commit 85c4f43
Show file tree
Hide file tree
Showing 21 changed files with 291 additions and 4,495 deletions.
5 changes: 0 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,3 @@ build:
test:
go test ./... -short -race -timeout 1m
.PHONY: test

generate:
capnp compile -I ../go-capnp/std -ogo pkg/capnp/definitions.capnp
capnp compile -I ../go-capnp/std -ogo pkg/basinprovider/provider.capnp
.PHONY: generate
6 changes: 0 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
- [Listing Deals](#listing-deals)
- [Running](#running)
- [Run tests](#run-tests)
- [Generate Cap'N Proto code](#generate-capn-proto-code)
- [Retrieving](#retrieving)

# Background
Expand Down Expand Up @@ -197,11 +196,6 @@ make test

Note: One of the tests requires Docker Engine to be running.

## Generate Cap'N Proto code

```bash
make generate
```

# Contributing

Expand Down
2 changes: 1 addition & 1 deletion cmd/basin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// DefaultProviderHost is the address of Basin Provider.
const DefaultProviderHost = "basin.tableland.xyz:3000"
const DefaultProviderHost = "https://basin.tableland.xyz"

// DefaultWindowSize is the number of seconds for which WAL updates
// are buffered before being sent to the provider.
Expand Down
169 changes: 50 additions & 119 deletions cmd/basin/publication.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"
"time"

"capnproto.org/go/capnp/v3"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/filecoin-project/lassie/pkg/lassie"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/schollz/progressbar/v3"
"github.com/tablelandnetwork/basin-cli/internal/app"
"github.com/tablelandnetwork/basin-cli/pkg/basinprovider"
basincapnp "github.com/tablelandnetwork/basin-cli/pkg/capnp"
"github.com/tablelandnetwork/basin-cli/pkg/pgrepl"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -58,7 +56,6 @@ func newPublicationCommand() *cli.Command {

func newPublicationCreateCommand() *cli.Command {
var owner, dburi, provider string
var secure bool
var winSize, cache int64

return &cli.Command{
Expand All @@ -82,12 +79,6 @@ func newPublicationCreateCommand() *cli.Command {
Destination: &provider,
Value: DefaultProviderHost,
},
&cli.BoolFlag{
Name: "secure",
Usage: "Uses TLS connection",
Destination: &secure,
Value: true,
},
&cli.Int64Flag{
Name: "window-size",
Usage: "Number of seconds for which WAL updates are buffered before being sent to the provider",
Expand Down Expand Up @@ -153,7 +144,7 @@ func newPublicationCreateCommand() *cli.Command {
return fmt.Errorf("encode: %s", err)
}

exists, err := createPublication(cCtx.Context, dburi, ns, rel, provider, owner, secure, cache)
exists, err := createPublication(cCtx.Context, dburi, ns, rel, provider, owner, cache)
if err != nil {
return fmt.Errorf("failed to create publication: %s", err)
}
Expand All @@ -175,7 +166,6 @@ func newPublicationCreateCommand() *cli.Command {

func newPublicationStartCommand() *cli.Command {
var privateKey string
var secure bool

return &cli.Command{
Name: "start",
Expand All @@ -187,12 +177,6 @@ func newPublicationStartCommand() *cli.Command {
Destination: &privateKey,
Required: true,
},
&cli.BoolFlag{
Name: "secure",
Usage: "Uses TLS connection",
Destination: &secure,
Value: true,
},
},
Action: func(cCtx *cli.Context) error {
if cCtx.NArg() != 1 {
Expand Down Expand Up @@ -233,11 +217,7 @@ func newPublicationStartCommand() *cli.Command {
return err
}

bp, err := basinprovider.New(cCtx.Context, cfg.Publications[publication].ProviderHost, secure)
if err != nil {
return err
}
defer bp.Close()
bp := basinprovider.New(cfg.Publications[publication].ProviderHost)

pgxConn, err := pgx.Connect(cCtx.Context, connString)
if err != nil {
Expand Down Expand Up @@ -285,7 +265,6 @@ func newPublicationStartCommand() *cli.Command {

func newPublicationUploadCommand() *cli.Command {
var privateKey, publicationName string
var secure bool
var timestamp string

return &cli.Command{
Expand All @@ -304,12 +283,6 @@ func newPublicationUploadCommand() *cli.Command {
Destination: &publicationName,
Required: true,
},
&cli.BoolFlag{
Name: "secure",
Usage: "Uses TLS connection",
Destination: &secure,
Value: true,
},
&cli.StringFlag{
Name: "timestamp",
Usage: "The time the file was created (default: current epoch in UTC)",
Expand Down Expand Up @@ -340,11 +313,7 @@ func newPublicationUploadCommand() *cli.Command {
return fmt.Errorf("load config: %s", err)
}

bp, err := basinprovider.New(cCtx.Context, cfg.Publications[publicationName].ProviderHost, secure)
if err != nil {
return err
}
defer bp.Close()
bp := basinprovider.New(cfg.Publications[publicationName].ProviderHost)

filepath := cCtx.Args().First()

Expand Down Expand Up @@ -376,7 +345,7 @@ func newPublicationUploadCommand() *cli.Command {
}

basinStreamer := app.NewBasinUploader(ns, rel, bp, privateKey)
if err := basinStreamer.Upload(cCtx.Context, filepath, bar, ts); err != nil {
if err := basinStreamer.Upload(cCtx.Context, filepath, bar, ts, fi.Size()); err != nil {
return fmt.Errorf("upload: %s", err)
}

Expand All @@ -387,7 +356,6 @@ func newPublicationUploadCommand() *cli.Command {

func newPublicationListCommand() *cli.Command {
var owner, provider string
var secure bool

return &cli.Command{
Name: "list",
Expand All @@ -405,31 +373,21 @@ func newPublicationListCommand() *cli.Command {
Destination: &provider,
Value: DefaultProviderHost,
},
&cli.BoolFlag{
Name: "secure",
Usage: "Uses TLS connection",
Destination: &secure,
Value: true,
},
},
Action: func(cCtx *cli.Context) error {
if !common.IsHexAddress(owner) {
return fmt.Errorf("%s is not a valid Ethereum wallet address", owner)
}

bp, err := basinprovider.New(cCtx.Context, provider, secure)
account, err := app.NewAccount(owner)
if err != nil {
return fmt.Errorf("new basin provider: %s", err)
return fmt.Errorf("%s is not a valid Ethereum wallet address", owner)
}
defer bp.Close()

publications, err := bp.List(cCtx.Context, common.HexToAddress(owner))
bp := basinprovider.New(provider)
vaults, err := bp.ListVaults(cCtx.Context, app.ListVaultsParams{Account: account})
if err != nil {
return fmt.Errorf("failed to list publications: %s", err)
}

for _, pub := range publications {
fmt.Printf("%s\n", pub)
for _, vault := range vaults {
fmt.Printf("%s\n", vault)
}

return nil
Expand All @@ -439,9 +397,7 @@ func newPublicationListCommand() *cli.Command {

func newPublicationDealsCommand() *cli.Command {
var publication, provider, before, after, at, format string
var limit, latest int
var offset int64
var secure bool
var limit, offset, latest int

return &cli.Command{
Name: "deals",
Expand Down Expand Up @@ -470,18 +426,12 @@ func newPublicationDealsCommand() *cli.Command {
Usage: "The latest N deals to fetch",
Destination: &latest,
},
&cli.Int64Flag{
&cli.IntFlag{
Name: "offset",
Usage: "The epoch to start from",
Destination: &offset,
Value: 0,
},
&cli.BoolFlag{
Name: "secure",
Usage: "Uses TLS connection",
Destination: &secure,
Value: true,
},
&cli.StringFlag{
Name: "before",
Usage: "Filter deals created before this timestamp",
Expand Down Expand Up @@ -513,22 +463,21 @@ func newPublicationDealsCommand() *cli.Command {
return err
}

bp, err := basinprovider.New(cCtx.Context, provider, secure)
if err != nil {
return fmt.Errorf("new basin provider: %s", err)
}
defer bp.Close()
bp := basinprovider.New(provider)

b, a, err := validateBeforeAndAfter(before, after, at)
if err != nil {
return err
}

var deals []app.DealInfo
var req app.ListVaultEventsParams
if latest > 0 {
deals, err = bp.LatestDeals(cCtx.Context, ns, rel, uint32(latest), b, a)
if err != nil {
return fmt.Errorf("failed to fetch deals: %s", err)
req = app.ListVaultEventsParams{
Vault: app.Vault(fmt.Sprintf("%s.%s", ns, rel)),
Limit: uint32(latest),
Offset: 0,
Before: b,
After: a,
}
} else {
if offset < 0 {
Expand All @@ -539,34 +488,42 @@ func newPublicationDealsCommand() *cli.Command {
return errors.New("limit has to be greater than 0")
}

deals, err = bp.Deals(cCtx.Context, ns, rel, uint32(limit), uint64(offset), b, a)
if err != nil {
return fmt.Errorf("failed to fetch deals: %s", err)
req = app.ListVaultEventsParams{
Vault: app.Vault(fmt.Sprintf("%s.%s", ns, rel)),
Limit: uint32(limit),
Offset: uint32(offset),
Before: b,
After: a,
}
}

events, err := bp.ListVaultEvents(cCtx.Context, req)
if err != nil {
return fmt.Errorf("failed to fetch deals: %s", err)
}

if format == "table" {
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"CID", "Size", "Timestamp", "Archived", "Cache Expiry"})

for _, deal := range deals {
for _, event := range events {
isArchived := "N"
if deal.IsArchived {
if event.IsArchived {
isArchived = "Y"
}
timestamp := "(null)"
if deal.Timestamp > 0 {
timestamp = time.Unix(deal.Timestamp, 0).Format(time.RFC3339)
if event.Timestamp > 0 {
timestamp = time.Unix(event.Timestamp, 0).Format(time.RFC3339)
}
table.Append([]string{
deal.CID, fmt.Sprintf("%d", deal.Size), timestamp, isArchived, deal.CacheExpiry,
event.CID, fmt.Sprintf("%d", event.Size), timestamp, isArchived, event.CacheExpiry,
})
}
table.Render()
} else if format == "json" {
jsonData, err := json.Marshal(deals)
jsonData, err := json.Marshal(events)
if err != nil {
return fmt.Errorf("error serializing deals to JSON")
return fmt.Errorf("error serializing events to JSON")
}
fmt.Println(string(jsonData))
} else {
Expand Down Expand Up @@ -701,19 +658,23 @@ func createPublication(
rel string,
provider string,
owner string,
secure bool,
cacheDuration int64,
) (exists bool, err error) {
bp, err := basinprovider.New(ctx, provider, secure)
account, err := app.NewAccount(owner)
if err != nil {
return false, err
return false, fmt.Errorf("not a valid account: %s", err)
}

bp := basinprovider.New(provider)
req := app.CreateVaultParams{
Account: account,
Vault: app.Vault(fmt.Sprintf("%s.%s", ns, rel)),
CacheDuration: app.CacheDuration(cacheDuration),
}
defer bp.Close()

if dburi == "" {
exists, err := bp.Create(ctx, ns, rel, basincapnp.Schema{}, common.HexToAddress(owner), cacheDuration)
if err != nil {
return false, fmt.Errorf("create call: %s", err)
if err := bp.CreateVault(ctx, req); err != nil {
return false, fmt.Errorf("create vault: %s", err)
}

return exists, nil
Expand All @@ -737,36 +698,6 @@ func createPublication(
}
}()

columns, err := inspectTable(ctx, tx, rel)
if err != nil {
return false, fmt.Errorf("failed to inspect table: %s", err)
}

_, seg, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
return false, fmt.Errorf("capnp new message: %s", err)
}

capnpSchema, err := basincapnp.NewRootSchema(seg)
if err != nil {
return false, fmt.Errorf("capnp new tx: %s", err)
}

columnsList, err := basincapnp.NewSchema_Column_List(seg, int32(len(columns)))
if err != nil {
return false, fmt.Errorf("capnp new columns list: %s", err)
}

for i, col := range columns {
column := columnsList.At(i)

_ = column.SetName(col.Name)
_ = column.SetType(col.Typ)
column.SetIsNullable(col.IsNull)
column.SetIsPartOfPrimaryKey(col.IsPrimary)
}
_ = capnpSchema.SetColumns(columnsList)

if _, err := tx.Exec(
ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR TABLE %s", pgrepl.Publication(rel).FullName(), rel),
); err != nil {
Expand All @@ -776,7 +707,7 @@ func createPublication(
return false, fmt.Errorf("failed to create publication: %s", err)
}

if _, err := bp.Create(ctx, ns, rel, capnpSchema, common.HexToAddress(owner), cacheDuration); err != nil {
if err := bp.CreateVault(ctx, req); err != nil {
return false, fmt.Errorf("create call: %s", err)
}

Expand Down
Loading

0 comments on commit 85c4f43

Please sign in to comment.