diff --git a/pkg/producer/producer.go b/pkg/producer/producer.go index 4200643..ce525a5 100644 --- a/pkg/producer/producer.go +++ b/pkg/producer/producer.go @@ -25,7 +25,7 @@ import ( ) // TXPOLLFREQUENCY how often to foll for transactions -const TXPOLLFREQUENCY = 10 * time.Minute +const TXPOLLFREQUENCY = 30 * time.Minute const backfillEvery = time.Hour * 12 type DripProgramProducer struct { @@ -36,6 +36,7 @@ type DripProgramProducer struct { cancel context.CancelFunc environment config.Environment txBackfillStartSlot uint64 + shouldBackfillDripAccounts bool } func Server( @@ -55,6 +56,7 @@ func Server( cancel: cancel, environment: appConfig.GetEnvironment(), txBackfillStartSlot: 145626971, + shouldBackfillDripAccounts: appConfig.GetShouldBackfillDripAccounts(), } lifecycle.Append(fx.Hook{ OnStart: func(_ context.Context) error { @@ -80,18 +82,20 @@ func (d *DripProgramProducer) start(ctx context.Context) error { } // In staging, we manually backfill tokenswaps and whirlpools so that we can limit the # of rows in the DB - if config.IsProductionEnvironment(d.environment) { - // Track token_swap program accounts - //if err := d.client.ProgramSubscribe(ctx, tokenswap.ProgramID.String(), d.processor.AddItemToAccountUpdateQueueCallback(ctx, tokenswap.ProgramID.String())); err != nil { - // return err - //} + //if config.IsProductionEnvironment(d.environment) { + // Track token_swap program accounts + //if err := d.client.ProgramSubscribe(ctx, tokenswap.ProgramID.String(), d.processor.AddItemToAccountUpdateQueueCallback(ctx, tokenswap.ProgramID.String())); err != nil { + // return err + //} - // Track orca_whirlpool program accounts - if err := d.client.ProgramSubscribe(ctx, whirlpool.ProgramID.String(), d.AddItemToAccountUpdateQueueCallback(ctx, whirlpool.ProgramID.String())); err != nil { - return err - } + // Track orca_whirlpool program accounts + //if err := d.client.ProgramSubscribe(ctx, whirlpool.ProgramID.String(), d.AddItemToAccountUpdateQueueCallback(ctx, whirlpool.ProgramID.String())); err != nil { + // return err + //} + //} + if d.shouldBackfillDripAccounts { + go d.backfillAccounts(ctx) } - go d.backfillAccounts(ctx) go d.pollTransactions(ctx) return nil } @@ -161,7 +165,7 @@ func (d *DripProgramProducer) pollTransactions(ctx context.Context) { ticker := time.NewTicker(TXPOLLFREQUENCY) for { if err := d.processFromLastCheckpointSlot(ctx); err != nil { - logrus.WithError(err).Error("failed to produce block with retry, skipping...") + logrus.WithError(err).Error("failed to processFromLastCheckpointSlot, retrying in the next tick...") } select { case <-ticker.C: @@ -182,47 +186,48 @@ func (d *DripProgramProducer) processFromLastCheckpointSlot(ctx context.Context) untilSignature = solana.MustSignatureFromBase58(checkpoint.Signature) } log := logrus.WithField("untilSignature", untilSignature.String()) - log.Info("starting processing") + log.WithField("programId", drip.ProgramID.String()).Info("starting processing") defer func() { log.Info("done processing") }() // do while loop until txSignatures is not empty + total := 0 for { + log = log.WithField("beforeSignature", beforeSignature.String()) txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), untilSignature, beforeSignature, nil) if err != nil { log.WithError(err).Error("failed to GetSignaturesForAddress") return err } + total += len(txSignatures) log = log.WithField("len(txSignatures)", len(txSignatures)) log.Info("got signatures") - for i := range lo.Reverse(txSignatures) { + // insert oldest to newest + txSignatures = lo.Reverse(txSignatures) + for i := range txSignatures { txSignature := txSignatures[i] + txPushLog := log.WithField("transactionSignature", txSignature.Signature.String()) tx, err := d.client.GetTransaction(ctx, txSignature.Signature.String()) if err != nil { - log.WithError(err).Error("failed to GetTransaction") + txPushLog.WithError(err).Error("failed to GetTransaction") return err } - log.WithField("transactionSignature", txSignature.Signature.String()).Info("pushing tx to queue...") if err := d.AddItemToTransactionUpdate(ctx, txSignature.Signature.String(), *tx); err != nil { - log.WithError(err).Error("failed to insert data into queue...") + txPushLog.WithError(err).Error("failed to insert data into queue...") return err - } else { - log.WithField("transactionSignature", txSignature).Info("pushed tx to queue...") } - log.Info("updating checkpoint...") if err := d.txProcessingCheckpointRepo.UpsertTransactionProcessingCheckpoint(ctx, txSignature.Slot, txSignature.Signature.String()); err != nil { - log.WithError(err).Error("failed to insert metadata...") + txPushLog.WithError(err).Error("failed to insert metadata...") return err } } if len(txSignatures) > 0 { beforeSignature = txSignatures[0].Signature - untilSignature = txSignatures[len(txSignatures)-1].Signature } else { - break + log = log.WithField("totalSignatures", total) + return nil } } - return nil } func (d *DripProgramProducer) AddItemToAccountUpdateQueueCallback(ctx context.Context, programId string) func(string, []byte) error { diff --git a/pkg/service/clients/solana/client.go b/pkg/service/clients/solana/client.go index 24fa7bc..eb8f7ec 100644 --- a/pkg/service/clients/solana/client.go +++ b/pkg/service/clients/solana/client.go @@ -9,18 +9,16 @@ import ( "github.com/AlekSi/pointer" - api "github.com/dcaf-labs/solana-go-retryable-http-client" - - "github.com/gagliardetto/solana-go/rpc/jsonrpc" - "github.com/dcaf-labs/drip/pkg/service/config" "github.com/dcaf-labs/drip/pkg/service/utils" + api "github.com/dcaf-labs/solana-go-retryable-http-client" bin "github.com/gagliardetto/binary" token_metadata "github.com/gagliardetto/metaplex-go/clients/token-metadata" "github.com/gagliardetto/solana-go" associatedtokenaccount "github.com/gagliardetto/solana-go/programs/associated-token-account" "github.com/gagliardetto/solana-go/programs/token" "github.com/gagliardetto/solana-go/rpc" + "github.com/gagliardetto/solana-go/rpc/jsonrpc" "github.com/gagliardetto/solana-go/rpc/ws" "github.com/mr-tron/base58" "github.com/sirupsen/logrus" @@ -68,7 +66,13 @@ type impl struct { client *rpc.Client } -func (s impl) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) { +func (s impl) GetSignaturesForAddress( + ctx context.Context, + pubkey string, + until solana.Signature, + before solana.Signature, + minSlot *uint64, +) ([]*rpc.TransactionSignature, error) { return s.client.GetSignaturesForAddressWithOpts(ctx, solana.MustPublicKeyFromBase58(pubkey), &rpc.GetSignaturesForAddressOpts{ Until: until, Before: before, diff --git a/pkg/service/config/config.go b/pkg/service/config/config.go index 00f6a5e..c6da0ed 100644 --- a/pkg/service/config/config.go +++ b/pkg/service/config/config.go @@ -45,20 +45,22 @@ type AppConfig interface { GetDiscordWebhookID() string GetDiscordWebhookAccessToken() string GetShouldByPassAdminAuth() bool + GetShouldBackfillDripAccounts() bool } type appConfig struct { - Environment Environment `yaml:"environment" env:"ENV" env-default:"STAGING"` - SolanaRPCURL string `yaml:"solanaRpcUrl" env:"SOLANARPCURL" env-default:"https://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"` - SolanaWSURL string `yaml:"solanaWsUrl" env:"SOLANAWSURL" env-default:"wss://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"` - Network Network `yaml:"network" env:"NETWORK" env-default:"DEVNET"` - DripProgramID string `yaml:"dripProgramID" env:"DRIP_PROGRAM_ID" env-default:"dripTrkvSyQKvkyWg7oi4jmeEGMA5scSYowHArJ9Vwk"` - GoogleClientID string `yaml:"googleClientID" env:"GOOGLE_CLIENT_ID" env-default:"540992596258-sa2h4lmtelo44tonpu9htsauk5uabdon.apps.googleusercontent.com"` - Wallet string `yaml:"wallet" env:"DRIP_BACKEND_WALLET"` - Port int `yaml:"port" env:"PORT"` - DiscordWebhookID string `yaml:"discordWebhookID" env:"DISCORD_WEBHOOK_ID"` - DiscordWebhookAccessToken string `yaml:"discordWebhookAccessToken" env:"DISCORD_ACCESS_TOKEN"` - ShouldByPassAdminAuth bool `yaml:"shouldBypassAdminAuth" env:"SHOULD_BYPASS_ADMIN_AUTH" env-default:"false"` + Environment Environment `yaml:"environment" env:"ENV" env-default:"STAGING"` + SolanaRPCURL string `yaml:"solanaRpcUrl" env:"SOLANARPCURL" env-default:"https://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"` + SolanaWSURL string `yaml:"solanaWsUrl" env:"SOLANAWSURL" env-default:"wss://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"` + Network Network `yaml:"network" env:"NETWORK" env-default:"DEVNET"` + DripProgramID string `yaml:"dripProgramID" env:"DRIP_PROGRAM_ID" env-default:"dripTrkvSyQKvkyWg7oi4jmeEGMA5scSYowHArJ9Vwk"` + GoogleClientID string `yaml:"googleClientID" env:"GOOGLE_CLIENT_ID" env-default:"540992596258-sa2h4lmtelo44tonpu9htsauk5uabdon.apps.googleusercontent.com"` + Wallet string `yaml:"wallet" env:"DRIP_BACKEND_WALLET"` + Port int `yaml:"port" env:"PORT"` + DiscordWebhookID string `yaml:"discordWebhookID" env:"DISCORD_WEBHOOK_ID"` + DiscordWebhookAccessToken string `yaml:"discordWebhookAccessToken" env:"DISCORD_ACCESS_TOKEN"` + ShouldByPassAdminAuth bool `yaml:"shouldBypassAdminAuth" env:"SHOULD_BYPASS_ADMIN_AUTH" env-default:"false"` + ShouldBackfillDripAccounts bool `yaml:"shouldBackfillDripAccounts" env:"SHOULD_BACKFILL_DRIP_ACCOUNTS" env-default:"true"` } func (a appConfig) GetShouldByPassAdminAuth() bool { @@ -69,6 +71,10 @@ func (a appConfig) GetEnvironment() Environment { return a.Environment } +func (a appConfig) GetShouldBackfillDripAccounts() bool { + return a.ShouldBackfillDripAccounts +} + func (a appConfig) GetSolanaRPCURL() string { return a.SolanaRPCURL } @@ -182,6 +188,7 @@ func NewAppConfig() (AppConfig, error) { log. WithField("programID", config.DripProgramID). WithField("ShouldByPassAdminAuth", config.ShouldByPassAdminAuth). + WithField("shouldBackfillDripAccounts", config.ShouldBackfillDripAccounts). Info("set programID") return config, nil } diff --git a/pkg/service/config/mock.go b/pkg/service/config/mock.go index ca576db..de3c9c5 100644 --- a/pkg/service/config/mock.go +++ b/pkg/service/config/mock.go @@ -131,6 +131,20 @@ func (mr *MockAppConfigMockRecorder) GetServerPort() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServerPort", reflect.TypeOf((*MockAppConfig)(nil).GetServerPort)) } +// GetShouldBackfillDripAccounts mocks base method. +func (m *MockAppConfig) GetShouldBackfillDripAccounts() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetShouldBackfillDripAccounts") + ret0, _ := ret[0].(bool) + return ret0 +} + +// GetShouldBackfillDripAccounts indicates an expected call of GetShouldBackfillDripAccounts. +func (mr *MockAppConfigMockRecorder) GetShouldBackfillDripAccounts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShouldBackfillDripAccounts", reflect.TypeOf((*MockAppConfig)(nil).GetShouldBackfillDripAccounts)) +} + // GetShouldByPassAdminAuth mocks base method. func (m *MockAppConfig) GetShouldByPassAdminAuth() bool { m.ctrl.T.Helper() diff --git a/pkg/service/processor/account_update.go b/pkg/service/processor/account_update.go index 765d84d..c64604e 100644 --- a/pkg/service/processor/account_update.go +++ b/pkg/service/processor/account_update.go @@ -5,6 +5,7 @@ import ( "runtime/debug" "strconv" "sync" + "time" "github.com/dcaf-labs/drip/pkg/service/repository/model" drip "github.com/dcaf-labs/solana-drip-go/pkg/v1" @@ -30,19 +31,22 @@ func (p impl) ProcessAccountUpdateQueue(ctx context.Context) { go p.processAccountUpdateQueueItemWorker(ctx, strconv.FormatInt(int64(i), 10), &wg, ch) } + ticker := time.NewTicker(POLLFREQUENCY) for { + queueItem, err := p.accountUpdateQueue.PopAccountUpdateQueueItem(ctx) + if err != nil && err == gorm.ErrRecordNotFound { + } else if err != nil { + logrus.WithError(err).Error("failed to fetch account from queue") + } else if queueItem == nil { + logrus.WithError(err).Error("failed to get next queue item") + } else { + ch <- queueItem + } select { + case <-ticker.C: + continue case <-ctx.Done(): return - default: - queueItem, err := p.accountUpdateQueue.PopAccountUpdateQueueItem(ctx) - if err != nil && err == gorm.ErrRecordNotFound { - continue - } else if queueItem == nil { - logrus.WithError(err).Error("failed to get next queue item") - continue - } - ch <- queueItem } } } diff --git a/pkg/service/processor/process.go b/pkg/service/processor/process.go index 1849aa9..d6174a5 100644 --- a/pkg/service/processor/process.go +++ b/pkg/service/processor/process.go @@ -2,6 +2,7 @@ package processor import ( "context" + "time" "github.com/dcaf-labs/drip/pkg/service/alert" "github.com/dcaf-labs/drip/pkg/service/clients/coingecko" @@ -14,7 +15,8 @@ import ( "github.com/gagliardetto/solana-go/rpc" ) -const processConcurrency = 10 +const processConcurrency = 50 +const POLLFREQUENCY = 100 * time.Millisecond type Processor interface { UpsertProtoConfigByAddress(context.Context, string) error diff --git a/pkg/service/processor/transaction_update.go b/pkg/service/processor/transaction_update.go index aa71115..b0ec029 100644 --- a/pkg/service/processor/transaction_update.go +++ b/pkg/service/processor/transaction_update.go @@ -31,23 +31,23 @@ func (p impl) ProcessTransactionUpdateQueue(ctx context.Context) { go p.processTransactionUpdateQueueItemWorker(ctx, strconv.FormatInt(int64(i), 10), &wg, ch) } + ticker := time.NewTicker(POLLFREQUENCY) + for { + queueItem, err := p.transactionUpdateQueue.PopTransactionUpdateQueueItem(ctx) + if err != nil && err == gorm.ErrRecordNotFound { + } else if err != nil { + logrus.WithError(err).Error("failed to fetch transaction from queue") + } else if queueItem == nil { + logrus.WithError(err).Error("failed to get next queue item") + } else { + ch <- queueItem + } select { + case <-ticker.C: + continue case <-ctx.Done(): return - default: - queueItem, err := p.transactionUpdateQueue.PopTransactionUpdateQueueItem(ctx) - if err != nil && err == gorm.ErrRecordNotFound { - continue - } else if err != nil { - logrus.WithError(err).Error("failed to fetch transaction from queue") - continue - } else if queueItem == nil { - logrus.WithError(err).Error("failed to get next queue item") - continue - } else { - ch <- queueItem - } } } } @@ -99,7 +99,7 @@ func (p impl) ProcessTransaction(ctx context.Context, txRaw rpc.GetTransactionRe if ixName == nil { continue } - log = log.WithField("ixName", *ixName) + log = log.WithField("ixName", *ixName).WithField("blockTime", blockTime.String()).WithField("slot", txRaw.Slot) log.Info("starting to parse ix") switch *ixName {