Skip to content

Commit

Permalink
fix: backfill all transactions (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
sol-mocha authored Apr 24, 2023
1 parent b0ed89e commit a247f02
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 64 deletions.
53 changes: 29 additions & 24 deletions pkg/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

// TXPOLLFREQUENCY how often to foll for transactions
const TXPOLLFREQUENCY = 10 * time.Minute
const TXPOLLFREQUENCY = 30 * time.Minute
const backfillEvery = time.Hour * 12

type DripProgramProducer struct {
Expand All @@ -36,6 +36,7 @@ type DripProgramProducer struct {
cancel context.CancelFunc
environment config.Environment
txBackfillStartSlot uint64
shouldBackfillDripAccounts bool
}

func Server(
Expand All @@ -55,6 +56,7 @@ func Server(
cancel: cancel,
environment: appConfig.GetEnvironment(),
txBackfillStartSlot: 145626971,
shouldBackfillDripAccounts: appConfig.GetShouldBackfillDripAccounts(),
}
lifecycle.Append(fx.Hook{
OnStart: func(_ context.Context) error {
Expand All @@ -80,18 +82,20 @@ func (d *DripProgramProducer) start(ctx context.Context) error {
}

// In staging, we manually backfill tokenswaps and whirlpools so that we can limit the # of rows in the DB
if config.IsProductionEnvironment(d.environment) {
// Track token_swap program accounts
//if err := d.client.ProgramSubscribe(ctx, tokenswap.ProgramID.String(), d.processor.AddItemToAccountUpdateQueueCallback(ctx, tokenswap.ProgramID.String())); err != nil {
// return err
//}
//if config.IsProductionEnvironment(d.environment) {
// Track token_swap program accounts
//if err := d.client.ProgramSubscribe(ctx, tokenswap.ProgramID.String(), d.processor.AddItemToAccountUpdateQueueCallback(ctx, tokenswap.ProgramID.String())); err != nil {
// return err
//}

// Track orca_whirlpool program accounts
if err := d.client.ProgramSubscribe(ctx, whirlpool.ProgramID.String(), d.AddItemToAccountUpdateQueueCallback(ctx, whirlpool.ProgramID.String())); err != nil {
return err
}
// Track orca_whirlpool program accounts
//if err := d.client.ProgramSubscribe(ctx, whirlpool.ProgramID.String(), d.AddItemToAccountUpdateQueueCallback(ctx, whirlpool.ProgramID.String())); err != nil {
// return err
//}
//}
if d.shouldBackfillDripAccounts {
go d.backfillAccounts(ctx)
}
go d.backfillAccounts(ctx)
go d.pollTransactions(ctx)
return nil
}
Expand Down Expand Up @@ -161,7 +165,7 @@ func (d *DripProgramProducer) pollTransactions(ctx context.Context) {
ticker := time.NewTicker(TXPOLLFREQUENCY)
for {
if err := d.processFromLastCheckpointSlot(ctx); err != nil {
logrus.WithError(err).Error("failed to produce block with retry, skipping...")
logrus.WithError(err).Error("failed to processFromLastCheckpointSlot, retrying in the next tick...")
}
select {
case <-ticker.C:
Expand All @@ -182,47 +186,48 @@ func (d *DripProgramProducer) processFromLastCheckpointSlot(ctx context.Context)
untilSignature = solana.MustSignatureFromBase58(checkpoint.Signature)
}
log := logrus.WithField("untilSignature", untilSignature.String())
log.Info("starting processing")
log.WithField("programId", drip.ProgramID.String()).Info("starting processing")
defer func() {
log.Info("done processing")
}()
// do while loop until txSignatures is not empty
total := 0
for {
log = log.WithField("beforeSignature", beforeSignature.String())
txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), untilSignature, beforeSignature, nil)
if err != nil {
log.WithError(err).Error("failed to GetSignaturesForAddress")
return err
}
total += len(txSignatures)
log = log.WithField("len(txSignatures)", len(txSignatures))
log.Info("got signatures")
for i := range lo.Reverse(txSignatures) {
// insert oldest to newest
txSignatures = lo.Reverse(txSignatures)
for i := range txSignatures {
txSignature := txSignatures[i]
txPushLog := log.WithField("transactionSignature", txSignature.Signature.String())
tx, err := d.client.GetTransaction(ctx, txSignature.Signature.String())
if err != nil {
log.WithError(err).Error("failed to GetTransaction")
txPushLog.WithError(err).Error("failed to GetTransaction")
return err
}
log.WithField("transactionSignature", txSignature.Signature.String()).Info("pushing tx to queue...")
if err := d.AddItemToTransactionUpdate(ctx, txSignature.Signature.String(), *tx); err != nil {
log.WithError(err).Error("failed to insert data into queue...")
txPushLog.WithError(err).Error("failed to insert data into queue...")
return err
} else {
log.WithField("transactionSignature", txSignature).Info("pushed tx to queue...")
}
log.Info("updating checkpoint...")
if err := d.txProcessingCheckpointRepo.UpsertTransactionProcessingCheckpoint(ctx, txSignature.Slot, txSignature.Signature.String()); err != nil {
log.WithError(err).Error("failed to insert metadata...")
txPushLog.WithError(err).Error("failed to insert metadata...")
return err
}
}
if len(txSignatures) > 0 {
beforeSignature = txSignatures[0].Signature
untilSignature = txSignatures[len(txSignatures)-1].Signature
} else {
break
log = log.WithField("totalSignatures", total)
return nil
}
}
return nil
}

func (d *DripProgramProducer) AddItemToAccountUpdateQueueCallback(ctx context.Context, programId string) func(string, []byte) error {
Expand Down
14 changes: 9 additions & 5 deletions pkg/service/clients/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ import (

"github.com/AlekSi/pointer"

api "github.com/dcaf-labs/solana-go-retryable-http-client"

"github.com/gagliardetto/solana-go/rpc/jsonrpc"

"github.com/dcaf-labs/drip/pkg/service/config"
"github.com/dcaf-labs/drip/pkg/service/utils"
api "github.com/dcaf-labs/solana-go-retryable-http-client"
bin "github.com/gagliardetto/binary"
token_metadata "github.com/gagliardetto/metaplex-go/clients/token-metadata"
"github.com/gagliardetto/solana-go"
associatedtokenaccount "github.com/gagliardetto/solana-go/programs/associated-token-account"
"github.com/gagliardetto/solana-go/programs/token"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
"github.com/gagliardetto/solana-go/rpc/ws"
"github.com/mr-tron/base58"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -68,7 +66,13 @@ type impl struct {
client *rpc.Client
}

func (s impl) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) {
func (s impl) GetSignaturesForAddress(
ctx context.Context,
pubkey string,
until solana.Signature,
before solana.Signature,
minSlot *uint64,
) ([]*rpc.TransactionSignature, error) {
return s.client.GetSignaturesForAddressWithOpts(ctx, solana.MustPublicKeyFromBase58(pubkey), &rpc.GetSignaturesForAddressOpts{
Until: until,
Before: before,
Expand Down
29 changes: 18 additions & 11 deletions pkg/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,22 @@ type AppConfig interface {
GetDiscordWebhookID() string
GetDiscordWebhookAccessToken() string
GetShouldByPassAdminAuth() bool
GetShouldBackfillDripAccounts() bool
}

type appConfig struct {
Environment Environment `yaml:"environment" env:"ENV" env-default:"STAGING"`
SolanaRPCURL string `yaml:"solanaRpcUrl" env:"SOLANARPCURL" env-default:"https://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"`
SolanaWSURL string `yaml:"solanaWsUrl" env:"SOLANAWSURL" env-default:"wss://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"`
Network Network `yaml:"network" env:"NETWORK" env-default:"DEVNET"`
DripProgramID string `yaml:"dripProgramID" env:"DRIP_PROGRAM_ID" env-default:"dripTrkvSyQKvkyWg7oi4jmeEGMA5scSYowHArJ9Vwk"`
GoogleClientID string `yaml:"googleClientID" env:"GOOGLE_CLIENT_ID" env-default:"540992596258-sa2h4lmtelo44tonpu9htsauk5uabdon.apps.googleusercontent.com"`
Wallet string `yaml:"wallet" env:"DRIP_BACKEND_WALLET"`
Port int `yaml:"port" env:"PORT"`
DiscordWebhookID string `yaml:"discordWebhookID" env:"DISCORD_WEBHOOK_ID"`
DiscordWebhookAccessToken string `yaml:"discordWebhookAccessToken" env:"DISCORD_ACCESS_TOKEN"`
ShouldByPassAdminAuth bool `yaml:"shouldBypassAdminAuth" env:"SHOULD_BYPASS_ADMIN_AUTH" env-default:"false"`
Environment Environment `yaml:"environment" env:"ENV" env-default:"STAGING"`
SolanaRPCURL string `yaml:"solanaRpcUrl" env:"SOLANARPCURL" env-default:"https://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"`
SolanaWSURL string `yaml:"solanaWsUrl" env:"SOLANAWSURL" env-default:"wss://wiser-icy-bush.solana-devnet.discover.quiknode.pro/7288cc56d980336f6fc0508eb1aa73e44fd2efcd/"`
Network Network `yaml:"network" env:"NETWORK" env-default:"DEVNET"`
DripProgramID string `yaml:"dripProgramID" env:"DRIP_PROGRAM_ID" env-default:"dripTrkvSyQKvkyWg7oi4jmeEGMA5scSYowHArJ9Vwk"`
GoogleClientID string `yaml:"googleClientID" env:"GOOGLE_CLIENT_ID" env-default:"540992596258-sa2h4lmtelo44tonpu9htsauk5uabdon.apps.googleusercontent.com"`
Wallet string `yaml:"wallet" env:"DRIP_BACKEND_WALLET"`
Port int `yaml:"port" env:"PORT"`
DiscordWebhookID string `yaml:"discordWebhookID" env:"DISCORD_WEBHOOK_ID"`
DiscordWebhookAccessToken string `yaml:"discordWebhookAccessToken" env:"DISCORD_ACCESS_TOKEN"`
ShouldByPassAdminAuth bool `yaml:"shouldBypassAdminAuth" env:"SHOULD_BYPASS_ADMIN_AUTH" env-default:"false"`
ShouldBackfillDripAccounts bool `yaml:"shouldBackfillDripAccounts" env:"SHOULD_BACKFILL_DRIP_ACCOUNTS" env-default:"true"`
}

func (a appConfig) GetShouldByPassAdminAuth() bool {
Expand All @@ -69,6 +71,10 @@ func (a appConfig) GetEnvironment() Environment {
return a.Environment
}

func (a appConfig) GetShouldBackfillDripAccounts() bool {
return a.ShouldBackfillDripAccounts
}

func (a appConfig) GetSolanaRPCURL() string {
return a.SolanaRPCURL
}
Expand Down Expand Up @@ -182,6 +188,7 @@ func NewAppConfig() (AppConfig, error) {
log.
WithField("programID", config.DripProgramID).
WithField("ShouldByPassAdminAuth", config.ShouldByPassAdminAuth).
WithField("shouldBackfillDripAccounts", config.ShouldBackfillDripAccounts).
Info("set programID")
return config, nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/service/config/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 13 additions & 9 deletions pkg/service/processor/account_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"runtime/debug"
"strconv"
"sync"
"time"

"github.com/dcaf-labs/drip/pkg/service/repository/model"
drip "github.com/dcaf-labs/solana-drip-go/pkg/v1"
Expand All @@ -30,19 +31,22 @@ func (p impl) ProcessAccountUpdateQueue(ctx context.Context) {
go p.processAccountUpdateQueueItemWorker(ctx, strconv.FormatInt(int64(i), 10), &wg, ch)
}

ticker := time.NewTicker(POLLFREQUENCY)
for {
queueItem, err := p.accountUpdateQueue.PopAccountUpdateQueueItem(ctx)
if err != nil && err == gorm.ErrRecordNotFound {
} else if err != nil {
logrus.WithError(err).Error("failed to fetch account from queue")
} else if queueItem == nil {
logrus.WithError(err).Error("failed to get next queue item")
} else {
ch <- queueItem
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
default:
queueItem, err := p.accountUpdateQueue.PopAccountUpdateQueueItem(ctx)
if err != nil && err == gorm.ErrRecordNotFound {
continue
} else if queueItem == nil {
logrus.WithError(err).Error("failed to get next queue item")
continue
}
ch <- queueItem
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/processor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package processor

import (
"context"
"time"

"github.com/dcaf-labs/drip/pkg/service/alert"
"github.com/dcaf-labs/drip/pkg/service/clients/coingecko"
Expand All @@ -14,7 +15,8 @@ import (
"github.com/gagliardetto/solana-go/rpc"
)

const processConcurrency = 10
const processConcurrency = 50
const POLLFREQUENCY = 100 * time.Millisecond

type Processor interface {
UpsertProtoConfigByAddress(context.Context, string) error
Expand Down
28 changes: 14 additions & 14 deletions pkg/service/processor/transaction_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ func (p impl) ProcessTransactionUpdateQueue(ctx context.Context) {
go p.processTransactionUpdateQueueItemWorker(ctx, strconv.FormatInt(int64(i), 10), &wg, ch)
}

ticker := time.NewTicker(POLLFREQUENCY)

for {
queueItem, err := p.transactionUpdateQueue.PopTransactionUpdateQueueItem(ctx)
if err != nil && err == gorm.ErrRecordNotFound {
} else if err != nil {
logrus.WithError(err).Error("failed to fetch transaction from queue")
} else if queueItem == nil {
logrus.WithError(err).Error("failed to get next queue item")
} else {
ch <- queueItem
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
default:
queueItem, err := p.transactionUpdateQueue.PopTransactionUpdateQueueItem(ctx)
if err != nil && err == gorm.ErrRecordNotFound {
continue
} else if err != nil {
logrus.WithError(err).Error("failed to fetch transaction from queue")
continue
} else if queueItem == nil {
logrus.WithError(err).Error("failed to get next queue item")
continue
} else {
ch <- queueItem
}
}
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (p impl) ProcessTransaction(ctx context.Context, txRaw rpc.GetTransactionRe
if ixName == nil {
continue
}
log = log.WithField("ixName", *ixName)
log = log.WithField("ixName", *ixName).WithField("blockTime", blockTime.String()).WithField("slot", txRaw.Slot)
log.Info("starting to parse ix")

switch *ixName {
Expand Down

0 comments on commit a247f02

Please sign in to comment.