Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zetaclient)!: orchestrator V2 #3332

Merged
merged 28 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
04fa091
Minor fixes
swift1337 Dec 24, 2024
0c46e64
Add orchestrator V2. Move context updater to v2
swift1337 Dec 24, 2024
63961eb
Fix orchestrator_v2 test cases
swift1337 Dec 27, 2024
df21451
Fix flaky test cases during concurrent runs (spoiler: goroutines)
swift1337 Dec 27, 2024
cac2f61
Add V2 to start.go
swift1337 Dec 27, 2024
eff0517
chain sync skeleton
swift1337 Dec 30, 2024
7232f53
Move common btc stuff to common/ to fix import cycle
swift1337 Dec 30, 2024
e13d64c
Implement BTC observerSigner
swift1337 Jan 3, 2025
91bab84
Drop redundant code
swift1337 Jan 3, 2025
91dd276
Fix ticker concurrency bug
swift1337 Jan 3, 2025
991b751
Add scheduler.Tasks()
swift1337 Jan 3, 2025
22d8a51
Add v2 btc observer-signer 101 test cases. Drop redundant tests
swift1337 Jan 3, 2025
e81db51
Address PR comments
swift1337 Jan 6, 2025
c637cee
Add issue
swift1337 Jan 7, 2025
bc5dd8f
fix inbound debug cmd
swift1337 Jan 7, 2025
87820c3
Merge branch 'develop' into feat/btc-observer-signer
swift1337 Jan 7, 2025
ee5b997
Add tss graceful shutdown
swift1337 Jan 7, 2025
5e89631
Update changelog
swift1337 Jan 7, 2025
d2d98da
fix tss tests
swift1337 Jan 7, 2025
ffcd731
Fix IntervalUpdater
swift1337 Jan 8, 2025
a04244b
Mitigate errors when BTC node is disabled
swift1337 Jan 8, 2025
c91aeea
Implement pkg/fanout
swift1337 Jan 8, 2025
3b6cb6d
Apply fanout to block subscriber
swift1337 Jan 8, 2025
ca9c7f8
Merge branch 'develop' into feat/btc-observer-signer
swift1337 Jan 8, 2025
d7e5d55
Fix typo
swift1337 Jan 9, 2025
f35ea11
Minor btc signer improvements
swift1337 Jan 10, 2025
849bdde
Make V1.Stop() safe to call multiple times
swift1337 Jan 10, 2025
c1c0078
FIX DATA RACE
swift1337 Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient
* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown)
* [3319](https://github.com/zeta-chain/node/pull/3319) - implement scheduler for zetaclient
* [3332](https://github.com/zeta-chain/node/pull/3332) - implement orchestrator V2. Move BTC observer-signer to V2

### Fixes

Expand Down
36 changes: 28 additions & 8 deletions cmd/zetaclientd/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
"strconv"
"strings"

"cosmossdk.io/errors"
sdk "github.com/cosmos/cosmos-sdk/types"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
btcrpc "github.com/zeta-chain/node/zetaclient/chains/bitcoin/rpc"
evmobserver "github.com/zeta-chain/node/zetaclient/chains/evm/observer"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
Expand Down Expand Up @@ -156,17 +157,36 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
}
fmt.Println("CoinType : ", coinType)
} else if chain.IsBitcoin() {
observer, ok := observers[chainID]
if !ok {
return fmt.Errorf("observer not found for btc chain %d", chainID)
bitcoinConfig, found := appContext.Config().GetBTCConfig(chain.ID())
if !found {
return fmt.Errorf("unable to find btc config")
}

btcObserver, ok := observer.(*btcobserver.Observer)
if !ok {
return fmt.Errorf("observer is not btc observer for chain %d", chainID)
rpcClient, err := btcrpc.NewRPCClient(bitcoinConfig)
if err != nil {
return errors.Wrap(err, "unable to create rpc client")
}

database, err := db.NewFromSqliteInMemory(true)
if err != nil {
return errors.Wrap(err, "unable to open database")
}

observer, err := btcobserver.NewObserver(
*chain.RawChain(),
rpcClient,
*chain.Params(),
client,
nil,
database,
baseLogger,
nil,
)
if err != nil {
return errors.Wrap(err, "unable to create btc observer")
}

ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
ballotIdentifier, err = observer.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/graceful"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/pkg/scheduler"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
Expand Down Expand Up @@ -115,6 +116,8 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "unable to setup TSS service")
}

graceful.AddStopper(tss.Stop)

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() {
Expand Down Expand Up @@ -159,9 +162,27 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "unable to create orchestrator")
}

taskScheduler := scheduler.New(logger.Std)
maestroV2Deps := &orchestrator.Dependencies{
Zetacore: zetacoreClient,
TSS: tss,
DBPath: dbPath,
Telemetry: telemetry,
}

maestroV2, err := orchestrator.NewV2(taskScheduler, maestroV2Deps, logger)
if err != nil {
return errors.Wrap(err, "unable to create orchestrator V2")
}

// Start orchestrator with all observers and signers
graceful.AddService(ctx, maestro)

// Start orchestrator V2
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
// V2 will co-exist with V1 until all types of chains will be refactored (BTC, EVM, SOL, TON).
// (currently it's only BTC)
graceful.AddService(ctx, maestroV2)

// Block current routine until a shutdown signal is received
graceful.WaitForShutdown()

Expand Down
2 changes: 1 addition & 1 deletion e2e/e2etests/test_bitcoin_deposit_and_call_revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/zeta-chain/node/e2e/runner"
"github.com/zeta-chain/node/e2e/utils"
"github.com/zeta-chain/node/testutil/sample"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinDepositAndCallRevert(r *runner.E2ERunner, args []string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/testutil/sample"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

// TestBitcoinDepositAndCallRevertWithDust sends a Bitcoin deposit that reverts with a dust amount in the revert outbound.
Expand Down
6 changes: 3 additions & 3 deletions e2e/e2etests/test_bitcoin_deposit_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/zeta-chain/node/e2e/utils"
testcontract "github.com/zeta-chain/node/testutil/contracts"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinDepositAndCall(r *runner.E2ERunner, args []string) {
Expand All @@ -20,7 +20,7 @@ func TestBitcoinDepositAndCall(r *runner.E2ERunner, args []string) {
// Given amount to send
require.Len(r, args, 1)
amount := utils.ParseFloat(r, args[0])
amountTotal := amount + zetabitcoin.DefaultDepositorFee
amountTotal := amount + common.DefaultDepositorFee

// Given a list of UTXOs
utxos, err := r.ListDeployerUTXOs()
Expand All @@ -45,7 +45,7 @@ func TestBitcoinDepositAndCall(r *runner.E2ERunner, args []string) {
utils.RequireCCTXStatus(r, cctx, crosschaintypes.CctxStatus_OutboundMined)

// check if example contract has been called, 'bar' value should be set to amount
amountSats, err := zetabitcoin.GetSatoshis(amount)
amountSats, err := common.GetSatoshis(amount)
require.NoError(r, err)
utils.MustHaveCalledExampleContract(r, contract, big.NewInt(amountSats))
}
2 changes: 1 addition & 1 deletion e2e/e2etests/test_bitcoin_donation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/zeta-chain/node/e2e/utils"
"github.com/zeta-chain/node/pkg/constant"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinDonation(r *runner.E2ERunner, args []string) {
Expand Down
4 changes: 2 additions & 2 deletions e2e/e2etests/test_bitcoin_std_deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/e2e/utils"
"github.com/zeta-chain/node/pkg/memo"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinStdMemoDeposit(r *runner.E2ERunner, args []string) {
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestBitcoinStdMemoDeposit(r *runner.E2ERunner, args []string) {

// the runner balance should be increased by the deposit amount
amountIncreased := new(big.Int).Sub(balanceAfter, balanceBefore)
amountSatoshis, err := bitcoin.GetSatoshis(amount)
amountSatoshis, err := common.GetSatoshis(amount)
require.NoError(r, err)
require.Positive(r, amountSatoshis)
// #nosec G115 always positive
Expand Down
2 changes: 1 addition & 1 deletion e2e/e2etests/test_bitcoin_std_deposit_and_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/pkg/memo"
testcontract "github.com/zeta-chain/node/testutil/contracts"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinStdMemoDepositAndCall(r *runner.E2ERunner, args []string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/pkg/memo"
testcontract "github.com/zeta-chain/node/testutil/contracts"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinStdMemoInscribedDepositAndCall(r *runner.E2ERunner, args []string) {
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestBitcoinStdMemoInscribedDepositAndCall(r *runner.E2ERunner, args []strin
utils.RequireCCTXStatus(r, cctx, crosschaintypes.CctxStatus_OutboundMined)

// check if example contract has been called, 'bar' value should be set to correct amount
depositFeeSats, err := zetabitcoin.GetSatoshis(zetabitcoin.DefaultDepositorFee)
depositFeeSats, err := common.GetSatoshis(common.DefaultDepositorFee)
require.NoError(r, err)
receiveAmount := depositAmount - depositFeeSats
utils.MustHaveCalledExampleContract(r, contract, big.NewInt(receiveAmount))
Expand Down
12 changes: 6 additions & 6 deletions e2e/runner/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/memo"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabtc "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/signer"
)
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *E2ERunner) DepositBTCWithAmount(amount float64, memo *memo.InboundMemo)
r.Logger.Info("Now sending two txs to TSS address...")

// add depositor fee so that receiver gets the exact given 'amount' in ZetaChain
amount += zetabitcoin.DefaultDepositorFee
amount += zetabtc.DefaultDepositorFee

// deposit to TSS address
var txHash *chainhash.Hash
Expand Down Expand Up @@ -148,7 +148,7 @@ func (r *E2ERunner) DepositBTC(receiver common.Address) {
r.Logger.Info("Now sending two txs to TSS address and tester ZEVM address...")

// send initial BTC to the tester ZEVM address
amount := 1.15 + zetabitcoin.DefaultDepositorFee
amount := 1.15 + zetabtc.DefaultDepositorFee
txHash, err := r.DepositBTCWithLegacyMemo(amount, utxos[:2], receiver)
require.NoError(r, err)

Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *E2ERunner) sendToAddrFromDeployerWithMemo(

// use static fee 0.0005 BTC to calculate change
feeSats := btcutil.Amount(0.0005 * btcutil.SatoshiPerBitcoin)
amountInt, err := zetabitcoin.GetSatoshis(amount)
amountInt, err := zetabtc.GetSatoshis(amount)
require.NoError(r, err)
amountSats := btcutil.Amount(amountInt)
change := inputSats - feeSats - amountSats
Expand Down Expand Up @@ -351,7 +351,7 @@ func (r *E2ERunner) InscribeToTSSFromDeployerWithMemo(

// parameters to build the reveal transaction
commitOutputIdx := uint32(0)
commitAmount, err := zetabitcoin.GetSatoshis(amount)
commitAmount, err := zetabtc.GetSatoshis(amount)
require.NoError(r, err)

// build the reveal transaction to spend above funds
Expand Down Expand Up @@ -412,7 +412,7 @@ func (r *E2ERunner) QueryOutboundReceiverAndAmount(txid string) (string, int64)
// parse receiver address from pkScript
txOutput := revertTx.MsgTx().TxOut[1]
pkScript := txOutput.PkScript
receiver, err := zetabitcoin.DecodeScriptP2WPKH(hex.EncodeToString(pkScript), r.BitcoinParams)
receiver, err := zetabtc.DecodeScriptP2WPKH(hex.EncodeToString(pkScript), r.BitcoinParams)
require.NoError(r, err)

return receiver, txOutput.Value
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ require (
github.com/google/gopacket v1.1.19 // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down Expand Up @@ -249,8 +249,8 @@ require (
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/client_model v0.4.0
github.com/prometheus/common v0.42.0
github.com/prometheus/procfs v0.9.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/chains/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"github.com/btcsuite/btcd/chaincfg"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/tonkeeper/tongo/ton"

"github.com/zeta-chain/node/zetaclient/logs"
)

// Validate checks whether the chain is valid
Expand Down Expand Up @@ -108,6 +110,13 @@
return chain.Consensus == Consensus_catchain_consensus
}

func (chain Chain) LogFields() map[string]any {
return map[string]any{
logs.FieldChain: chain.ChainId,
logs.FieldChainNetwork: chain.Network.String(),
}

Check warning on line 117 in pkg/chains/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/chains/chain.go#L113-L117

Added lines #L113 - L117 were not covered by tests
}

// DecodeAddressFromChainID decode the address string to bytes
// additionalChains is a list of additional chains to search from
// in practice, it is used in the protocol to dynamically support new chains without doing an upgrade
Expand Down
66 changes: 66 additions & 0 deletions pkg/fanout/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Package fanout provides a fan-out pattern implementation.
// It allows one channel to stream data to multiple independent channels.
// Note that context handling is out of the scope of this package.
package fanout

import "sync"

const DefaultBuffer = 8

// FanOut is a fan-out pattern implementation.
// It is NOT a worker pool, so use it wisely.
type FanOut[T any] struct {
input <-chan T
outputs []chan T

// outputBuffer chan buffer size for outputs channels.
// This helps with writing to chan in case of slow consumers.
outputBuffer int

mu sync.RWMutex
}

// New constructs FanOut
func New[T any](source <-chan T, buf int) *FanOut[T] {
return &FanOut[T]{
input: source,
outputs: make([]chan T, 0),
outputBuffer: buf,
}
}

func (f *FanOut[T]) Add() <-chan T {
out := make(chan T, f.outputBuffer)

f.mu.Lock()
defer f.mu.Unlock()

f.outputs = append(f.outputs, out)

return out
}

// Start starts the fan-out process
func (f *FanOut[T]) Start() {
go func() {
// loop for new data
for data := range f.input {
f.mu.RLock()
for _, output := range f.outputs {
// note that this might spawn lots of goroutines.
// it is a naive approach, but should be more than enough for our use cases.
go func(output chan<- T) { output <- data }(output)
}
f.mu.RUnlock()
}

// at this point, the input was closed
f.mu.Lock()
defer f.mu.Unlock()
for _, out := range f.outputs {
close(out)
}

f.outputs = nil
}()
}
Loading
Loading