From e00334895f3f163666da7fadeb98069bd2921715 Mon Sep 17 00:00:00 2001 From: lordforever Date: Fri, 20 Dec 2024 05:53:10 +0530 Subject: [PATCH] buffer & nits --- contract-tools/xchain/xchain.go | 370 ++++++++++++++++++++++---------- src/Prover-Axelar.sol | 13 ++ 2 files changed, 269 insertions(+), 114 deletions(-) diff --git a/contract-tools/xchain/xchain.go b/contract-tools/xchain/xchain.go index 182b390..3102d49 100644 --- a/contract-tools/xchain/xchain.go +++ b/contract-tools/xchain/xchain.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "encoding/hex" "encoding/json" @@ -9,6 +8,8 @@ import ( "io" "log" "math/big" + "math/bits" + "net/http" "os" "os/signal" @@ -18,6 +19,9 @@ import ( "sync" "time" + ethcrypto "github.com/ethereum/go-ethereum/crypto" + gocrypto "github.com/filecoin-project/go-crypto" + "golang.org/x/crypto/blake2b" "golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum" @@ -40,7 +44,6 @@ import ( fbig "github.com/filecoin-project/go-state-types/big" builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/builtin/v9/market" - "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api/v0api" lotustypes "github.com/filecoin-project/lotus/chain/types" "github.com/google/uuid" @@ -70,11 +73,6 @@ func main() { Name: "daemon", Usage: "Start the xchain adapter daemon", Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "buffer-service", - Usage: "Run a buffer server", - Value: false, - }, &cli.BoolFlag{ Name: "aggregation-service", Usage: "Run an aggregation server", @@ -82,52 +80,53 @@ func main() { }, }, Action: func(cctx *cli.Context) error { - isBuffer := cctx.Bool("buffer-service") - isAgg := cctx.Bool("aggregation-service") - if !isBuffer && !isAgg { // default to running aggregator - isAgg = true + isAgg := cctx.Bool("aggregation-service") + if !isAgg { + isAgg = true // default to running aggregator } - + cfg, err := LoadConfig(cctx.String("config")) if err != nil { log.Fatal(err) } - + g, ctx := errgroup.WithContext(cctx.Context) + + // Start buffer service if using local buffer g.Go(func() error { - if !isBuffer { - return nil - } - path, err := homedir.Expand(cfg.BufferPath) - if err != nil { - return err - } - if err := os.MkdirAll(path, os.ModePerm); err != nil { - return err - } - - srv, err := NewBufferHTTPService(cfg.BufferPath) - if err != nil { - return &http.MaxBytesError{} - } - http.HandleFunc("/put", srv.PutHandler) - http.HandleFunc("/get", srv.GetHandler) - - fmt.Printf("Server starting on port %d\n", cfg.BufferPort) - server := &http.Server{ - Addr: fmt.Sprintf("0.0.0.0:%d", cfg.BufferPort), - Handler: nil, // http.DefaultServeMux - } - go func() { - if err := server.ListenAndServe(); err != http.ErrServerClosed { - log.Fatalf("Buffer HTTP server ListenAndServe: %v", err) + if cfg.BufferType == "local" { + path, err := homedir.Expand(cfg.BufferPath) + if err != nil { + return err } - }() - <-ctx.Done() - - // Context is cancelled, shut down the server - return server.Shutdown(context.Background()) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return err + } + + srv, err := NewBufferHTTPService(cfg.BufferPath) + if err != nil { + return err + } + http.HandleFunc("/put", srv.PutHandler) + http.HandleFunc("/get", srv.GetHandler) + + fmt.Printf("Local buffer server starting on port %d\n", cfg.BufferPort) + server := &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", cfg.BufferPort), + Handler: nil, + } + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("Buffer HTTP server ListenAndServe: %v", err) + } + }() + <-ctx.Done() + return server.Shutdown(context.Background()) + } + return nil }) + + // Start aggregator service g.Go(func() error { if !isAgg { return nil @@ -238,7 +237,10 @@ type Config struct { TransferPort int ProviderAddr string LotusAPI string - TargetAggSize int + LighthouseApiKey string + AuthToken string + TargetAggSize int + BufferType string // "lighthouse" or "local" } // Mirror OnRamp.sol's `Offer` struct @@ -279,11 +281,14 @@ type aggregator struct { transferID int // ID of the next transfer transferAddr string // address to listen for transfer requests targetDealSize uint64 // how big aggregates should be + minDealSize uint64 // minimum deal size + lighthouseApiKey string // API key for lighthouse host host.Host // libp2p host for deal protocol to boost spDealAddr *peer.AddrInfo // address to reach boost (or other) deal v 1.2 provider spActorAddr address.Address // address of the storage provider actor lotusAPI v0api.FullNode // Lotus API for determining deal start epoch and collateral bounds cleanup func() // cleanup function to call on shutdown + bufferType string // Type of buffer to use } // Thank you @ribasushi @@ -351,7 +356,7 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { return nil, err } - lAPI, closer, err := NewLotusDaemonAPIClientV0(ctx, cfg.LotusAPI, 1, "") + lAPI, closer, err := NewLotusDaemonAPIClientV0(ctx, cfg.LotusAPI, 1, cfg.AuthToken) if err != nil { return nil, err } @@ -401,10 +406,12 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { spDealAddr: psPeerInfo, spActorAddr: providerAddr, lotusAPI: lAPI, + lighthouseApiKey: cfg.LighthouseApiKey, cleanup: func() { closer() fmt.Printf("done with lotus api closer\n") }, + bufferType: cfg.BufferType, }, nil } @@ -424,7 +431,10 @@ func (a *aggregator) run(ctx context.Context) error { } err := a.SubscribeQuery(ctx, query) - for err == nil || strings.Contains(err.Error(), "read tcp") { + // if err != nil { + // return err + // } + for err == nil || strings.Contains(err.Error(), "read udp") { if err != nil { log.Printf("ignoring mystery error: %s", err) } @@ -466,18 +476,6 @@ func (a *aggregator) run(ctx context.Context) error { } const ( - // PODSI aggregation uses 64 extra bytes per piece - // pieceOverhead = uint64(64) TODO uncomment this when we are smarter about determining threshold crossing - // Piece CID of small valid car (below) that must be prepended to the aggregation for deal acceptance - prefixCARCid = "baga6ea4seaqiklhpuei4wz7x3wwpvnul3sscfyrz2dpi722vgpwlolfky2dmwey" - // Hex of the prefix car file - prefixCAR = "3aa265726f6f747381d82a58250001701220b9ecb605f194801ee8a8355014e7e6e62966f94ccb6081" + - "631e82217872209dae6776657273696f6e014101551220704a26a32a76cf3ab66ffe41eb27adefefe9c93206960bb0" + - "147b9ed5e1e948b0576861744966487567684576657265747449494957617352696768743f5601701220b9ecb605f1" + - "94801ee8a8355014e7e6e62966f94ccb6081631e82217872209dae122c0a2401551220704a26a32a76cf3ab66ffe41" + - "eb27adefefe9c93206960bb0147b9ed5e1e948b012026576181d0a020801" - // Size of the padded prefix car in bytes - prefixCARSizePadded = uint64(256) // Data transfer port transferPort = 1728 // libp2p identifier for latest deal protocol @@ -491,12 +489,9 @@ const ( func (a *aggregator) runAggregate(ctx context.Context) error { // pieces being aggregated, flushed upon commitment // Invariant: the pieces in the pending queue can always make a valid aggregate w.r.t a.targetDealSize - pending := make([]DataReadyEvent, 0, 256) + // pending := make([]DataReadyEvent, 0, 256) + var pending []DataReadyEvent total := uint64(0) - prefixPiece := filabi.PieceInfo{ - Size: filabi.PaddedPieceSize(prefixCARSizePadded), - PieceCID: cid.MustParse(prefixCARCid), - } for { select { @@ -504,15 +499,17 @@ func (a *aggregator) runAggregate(ctx context.Context) error { fmt.Printf("ctx done shutting down aggregation") return nil case latestEvent := <-a.ch: - // Check if the offer is too big to fit in a valid aggregate on its own - // TODO: as referenced below there must be a better way when we introspect on the gory details of NewAggregate + // makes deal once the no. of files in aggregate cross a threshold currently set to 1 (index 0) + if len(pending) >= 0 { + // TODO: implement better way such that there it takes in files until the aggregate size reaches certain threshold latestPiece, err := latestEvent.Offer.Piece() + pending = append(pending, latestEvent) if err != nil { log.Printf("skipping offer %d, size %d not valid padded piece size ", latestEvent.OfferID, latestEvent.Offer.Size) continue } _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), []filabi.PieceInfo{ - prefixPiece, + // prefixPiece, latestPiece, }) if err != nil { @@ -521,14 +518,8 @@ func (a *aggregator) runAggregate(ctx context.Context) error { continue } // TODO: in production we'll maybe want to move data from buffer before we commit to storing it. - - // TODO: Unsorted greedy is a very naive knapsack strategy, production will want something better - // TODO: doing all the work of creating an aggregate for every new offer is quite wasteful - // there must be a cheaper way to do this, but for now it is the most expediant without learning - // all the gory edge cases in NewAggregate - // Turn offers into datasegment pieces - pieces := make([]filabi.PieceInfo, len(pending)+1) + pieces := make([]filabi.PieceInfo, len(pending)) for i, event := range pending { piece, err := event.Offer.Piece() if err != nil { @@ -537,18 +528,26 @@ func (a *aggregator) runAggregate(ctx context.Context) error { pieces[i] = piece } - pieces[len(pending)] = latestPiece - // aggregate - aggregatePieces := append([]filabi.PieceInfo{ - prefixPiece, - }, pieces...) - _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), aggregatePieces) - if err != nil { // we've overshot, lets commit to just pieces in pending + aggregatePieces := pieces + _, size, err := datasegment.ComputeDealPlacement(aggregatePieces) + if err != nil { + panic(err) + } + overallSize := filabi.PaddedPieceSize(size) + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + if next < int(a.minDealSize) { + next = int(a.minDealSize) + } + dealSize := filabi.PaddedPieceSize(next) + a.targetDealSize = uint64(dealSize) + // _, err = datasegment.NewAggregate(dealSize, aggregatePieces) + // if err != nil { // we've overshot, lets commit to just pieces in pending total = 0 + // Remove the latest offer which took us over - pieces = pieces[:len(pieces)-1] - aggregatePieces = aggregatePieces[:len(aggregatePieces)-1] - agg, err := datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), aggregatePieces) + // pieces = pieces[:len(pieces)-1] + // aggregatePieces = aggregatePieces[:len(aggregatePieces)-1] + agg, err := datasegment.NewAggregate(dealSize, aggregatePieces) if err != nil { return fmt.Errorf("failed to create aggregate from pending, should not be reachable: %w", err) } @@ -601,9 +600,22 @@ func (a *aggregator) runAggregate(ctx context.Context) error { // Reset queue to empty, add the event that triggered aggregation pending = pending[:0] - pending = append(pending, latestEvent) + // pending = append(pending, latestEvent) } else { + latestPiece, err := latestEvent.Offer.Piece() + if err != nil { + log.Printf("skipping offer %d, size %d not valid padded piece size ", latestEvent.OfferID, latestEvent.Offer.Size) + continue + } + _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), []filabi.PieceInfo{ + // prefixPiece, + latestPiece, + }) + if err != nil { + log.Printf("skipping offer %d, size %d exceeds max PODSI packable size", latestEvent.OfferID, latestEvent.Offer.Size) + continue + } total += latestEvent.Offer.Size pending = append(pending, latestEvent) log.Printf("Offer %d added. %d offers pending aggregation with total size=%d\n", latestEvent.OfferID, len(pending), total) @@ -626,12 +638,31 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID if len(x) == 0 { return fmt.Errorf("cannot make a deal with storage provider %s because it does not support protocol version 1.2.0", a.spDealAddr.ID) } + // make serving url as per buffer + var url string + if a.bufferType == "lighthouse" { + // Upload to Lighthouse + aggLocation := `/home/wsl-ubuntu/blockchain/filecoin/onramp-contracts/contract-tools/aggregate/` + aggCommp.String() + err = a.saveAggregateToFile(transferID, aggLocation) + if err != nil { + log.Fatalf("failed to save aggregate to file: %s", err) + } + + lhResp, err := UploadToLighthouse(aggLocation, a.lighthouseApiKey) + if err != nil { + log.Fatalf("failed to upload to lighthouse: %s", err) + } + url = fmt.Sprintf("https://gateway.lighthouse.storage/ipfs/%s", lhResp.Hash) + } else { + // Use local buffer + url = fmt.Sprintf("http://%s/?id=%d", a.transferAddr, transferID) + } // Construct deal dealUuid := uuid.New() log.Printf("making deal for commp %s, UUID=%s\n", aggCommp.String(), dealUuid) transferParams := boosttypes2.HttpRequest{ - URL: fmt.Sprintf("http://%s/?id=%d", a.transferAddr, transferID), + URL: url, } paramsBytes, err := json.Marshal(transferParams) if err != nil { @@ -666,37 +697,79 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID return fmt.Errorf("failed to get chain ID: %w", err) } // Encode the chainID as uint256 - encodedChainID, err := encodeChainID(chainID) + // intEncodedChainID, err := encodeChainID(chainID) + // if err != nil { + // return fmt.Errorf("failed to encode chainID: %w", err) + // } + // fmt.Println(intEncodedChainID) + + clientAddr, err := a.lotusAPI.WalletDefaultAddress(ctx) + if err != nil { + return err + } + dummyBuf, err := cborutil.Dump("dummy") + if err != nil { + return err + } + b2sum := blake2b.Sum256(dummyBuf) + dummySig, err := a.lotusAPI.WalletSign(ctx, clientAddr, dummyBuf) + if err != nil { + return fmt.Errorf("wallet sign failed: %w", err) + } + + pubk, err := gocrypto.EcRecover(b2sum[:], dummySig.Data) if err != nil { - return fmt.Errorf("failed to encode chainID: %w", err) + return err + } + + pubkHash := ethcrypto.Keccak256(pubk[1:]) // Skip the first byte (0x04) which indicates uncompressed public key + ethAddress := common.BytesToAddress(pubkHash[12:]) // Take the last 20 bytes + encodedLabel, err := encodeChainIDAndAddress(chainID, ethAddress) + if err != nil { + return fmt.Errorf("failed to encode chainID and address: %w", err) } - dealLabel, err := market.NewLabelFromBytes(encodedChainID) + labelString := hex.EncodeToString(encodedLabel) + dealLabel, err := market.NewLabelFromString(labelString) if err != nil { return fmt.Errorf("failed to create deal label: %w", err) } - proposal := market.ClientDealProposal{ - Proposal: market.DealProposal{ - PieceCID: aggCommp, - PieceSize: filabi.PaddedPieceSize(a.targetDealSize), - VerifiedDeal: false, - Client: filClient, - Provider: a.spActorAddr, - StartEpoch: dealStart, - EndEpoch: dealEnd, - StoragePricePerEpoch: fbig.NewInt(0), - ProviderCollateral: providerCollateral, - Label: dealLabel, - }, + proposal := market.DealProposal{ + PieceCID: aggCommp, + PieceSize: filabi.PaddedPieceSize(a.targetDealSize), + VerifiedDeal: false, + Client: filClient, + Provider: a.spActorAddr, + StartEpoch: dealStart, + EndEpoch: dealEnd, + StoragePricePerEpoch: fbig.NewInt(0), + ProviderCollateral: providerCollateral, + Label: dealLabel, + } + + + buf, err := cborutil.Dump(&proposal) + if err != nil { + return err + } + fmt.Println("buf ",hex.EncodeToString(buf)) + log.Printf("about to sign with clientAddr: %s", clientAddr) + sig, err := a.lotusAPI.WalletSign(ctx, clientAddr, buf) + if err != nil { + return fmt.Errorf("wallet sign failed: %w", err) + } + clientProposal := market.ClientDealProposal{ + Proposal: proposal, // Signature is unchecked since client is smart contract - ClientSignature: crypto.Signature{ - Type: crypto.SigTypeBLS, - Data: []byte{0xc0, 0xff, 0xee}, - }, + ClientSignature: *sig, + // crypto.Signature{ + // Type: crypto.SigTypeBLS, + // Data: []byte{0xc0, 0xff, 0xee}, + // }, } dealParams := boosttypes.DealParams{ DealUUID: dealUuid, - ClientDealProposal: proposal, + ClientDealProposal: clientProposal, DealDataRoot: aggCommp, IsOffline: false, Transfer: transfer, @@ -803,14 +876,8 @@ func (a *aggregator) transferHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "No data found", http.StatusNotFound) return } - // First write the CAR prefix to the response - prefixCARBytes, err := hex.DecodeString(prefixCAR) - if err != nil { - http.Error(w, "Failed to decode CAR prefix", http.StatusInternalServerError) - return - } - readers := []io.Reader{bytes.NewReader(prefixCARBytes)} + readers := []io.Reader{} // Fetch each sub piece from its buffer location and write to response for _, url := range transfer.locations { lazyReader := &lazyHTTPReader{url: url} @@ -828,6 +895,42 @@ func (a *aggregator) transferHandler(w http.ResponseWriter, r *http.Request) { } } +func (a *aggregator) saveAggregateToFile(id int, location string) error { + a.transferLk.RLock() + transfer, ok := a.transfers[id] + a.transferLk.RUnlock() + if !ok { + return fmt.Errorf("no data found for ID %d", id) + } + + + readers := []io.Reader{} + // Fetch each sub piece from its buffer location and add to readers + for _, url := range transfer.locations { + lazyReader := &lazyHTTPReader{url: url} + readers = append(readers, lazyReader) + defer lazyReader.Close() + } + aggReader, err := transfer.agg.AggregateObjectReader(readers) + if err != nil { + return fmt.Errorf("failed to create aggregate reader: %w", err) + } + + // Create the file at the specified location + file, err := os.Create(location) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + // Copy the aggregated data to the file + _, err = io.Copy(file, aggReader) + if err != nil { + return fmt.Errorf("failed to write aggregate stream to file: %w", err) + } + + return nil +} type AggregateTransfer struct { locations []string agg *datasegment.Aggregate @@ -838,6 +941,7 @@ func (a *aggregator) SubscribeQuery(ctx context.Context, query ethereum.FilterQu log.Printf("Listening for data ready events on %s\n", a.onrampAddr.Hex()) sub, err := a.client.SubscribeFilterLogs(ctx, query, logs) if err != nil { + fmt.Println(err) return err } defer sub.Unsubscribe() @@ -1038,4 +1142,42 @@ func encodeChainID(chainID *big.Int) ([]byte, error) { return data, nil } +// encodeChainIDAndAddress encodes the chain ID and Ethereum address into bytes +func encodeChainIDAndAddress(chainID *big.Int, ethAddress common.Address) ([]byte, error) { + uint256Type, err := abi.NewType("uint256", "", nil) + if err != nil { + return nil, fmt.Errorf("failed to create uint256 type: %w", err) + } + + addressType, err := abi.NewType("address", "", nil) + if err != nil { + return nil, fmt.Errorf("failed to create address type: %w", err) + } + + + // Define the ABI encoding + arguments := abi.Arguments{ + {Type: uint256Type}, + {Type: addressType}, + } + + // Encode the chain ID and Ethereum address + encodedBytes, err := arguments.Pack(chainID, ethAddress) + if err != nil { + return nil, fmt.Errorf("error encoding arguments: %w", err) + } + + return encodedBytes, nil +} +// encodeChainIDAsString converts a *big.Int chain ID to its string representation +func encodeChainIDAsString(chainID *big.Int) (string, error) { + if chainID == nil { + return "", fmt.Errorf("chainID cannot be nil") + } + + // Convert the *big.Int to a string + chainIDStr := chainID.String() + + return chainIDStr, nil +} diff --git a/src/Prover-Axelar.sol b/src/Prover-Axelar.sol index 3d4d746..c5bf169 100644 --- a/src/Prover-Axelar.sol +++ b/src/Prover-Axelar.sol @@ -52,6 +52,8 @@ contract DealClient is AxelarExecutable { mapping(bytes => uint256) public providerGasFunds; // Funds set aside for calling oracle by provider mapping(uint256 => DestinationChain) public chainIdToDestinationChain; + event ReceivedDataCap(string received); + constructor( address _gateway, address _gasReceiver @@ -87,6 +89,15 @@ contract DealClient is AxelarExecutable { providerGasFunds[providerAddrData] += msg.value; } + function receiveDataCap(bytes memory) internal { + require( + msg.sender == DATACAP_ACTOR_ETH_ADDRESS, + "msg.sender needs to be datacap actor f07" + ); + emit ReceivedDataCap("DataCap Received!"); + // Add get datacap balance api and store datacap amount + } + // dealNotify is the callback from the market actor into the contract at the end // of PublishStorageDeals. This message holds the previously approved deal proposal // and the associated dealID. The dealID is stored as part of the contract state @@ -214,6 +225,8 @@ contract DealClient is AxelarExecutable { codec = Misc.CBOR_CODEC; } else if (method == MARKET_NOTIFY_DEAL_METHOD_NUM) { dealNotify(params); + } else if (method == DATACAP_RECEIVER_HOOK_METHOD_NUM) { + receiveDataCap(params); } else { revert("the filecoin method that was called is not handled"); }