Skip to content

Commit

Permalink
improve delete performance for clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Jan 13, 2025
1 parent 313d5ab commit e5a19e4
Showing 1 changed file with 168 additions and 26 deletions.
194 changes: 168 additions & 26 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
Settings: func() clickhouse.Settings {
if cfg.AsyncInsert {
return clickhouse.Settings{
"async_insert": "1",
"wait_for_async_insert": "1",
"async_insert": "1",
"wait_for_async_insert": "1",
"lightweight_deletes_sync": "0",
}
}
return clickhouse.Settings{}
Expand Down Expand Up @@ -954,68 +955,209 @@ func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int,
}

func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
var saveErr error
var saveErrMutex sync.Mutex
var deleteErr error
var deleteErrMutex sync.Mutex
var wg sync.WaitGroup
wg.Add(4)

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "blocks", "number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
if err := c.deleteBlocksByNumbers(chainId, blockNumbers); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting blocks: %v", err)
deleteErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "logs", "block_number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
if err := c.deleteLogsByNumbers(chainId, blockNumbers); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting logs: %v", err)
deleteErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "transactions", "block_number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
if err := c.deleteTransactionsByNumbers(chainId, blockNumbers); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting transactions: %v", err)
deleteErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "traces", "block_number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
if err := c.deleteTracesByNumbers(chainId, blockNumbers); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting traces: %v", err)
deleteErrMutex.Unlock()
}
}()

wg.Wait()

if saveErr != nil {
return saveErr
if deleteErr != nil {
return deleteErr
}
return nil
}

func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, blockNumberColumn string) error {
query := fmt.Sprintf("DELETE FROM %s.%s WHERE chain_id = ? AND %s IN (?)", c.cfg.Database, table, blockNumberColumn)
func (c *ClickHouseConnector) deleteBlocksByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
query := fmt.Sprintf("DELETE FROM %s.blocks WHERE _partition_value.1 = ? AND chain_id = ? AND number IN (?)", c.cfg.Database)

blockNumbersStr := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumbersStr[i] = bn.String()
}
err := c.conn.Exec(context.Background(), query, chainId, chainId, blockNumbersStr)
if err != nil {
return fmt.Errorf("error deleting blocks: %w", err)
}
return nil
}

func (c *ClickHouseConnector) deleteLogsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
blockNumbersStr := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumbersStr[i] = bn.String()
}
getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, log_index FROM %s.logs WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String())

rows, getErr := c.conn.Query(context.Background(), getQuery)
if getErr != nil {
return getErr
}
defer rows.Close()

logsToDelete := make([]common.Log, 0)
for rows.Next() {
var logToDelete common.Log
err := rows.ScanStruct(&logToDelete)
if err != nil {
return err
}
logsToDelete = append(logsToDelete, logToDelete)
}

deleteQuery := fmt.Sprintf("DELETE FROM %s.logs WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?", c.cfg.Database)

err := c.conn.Exec(context.Background(), query, chainId, blockNumbersStr)
batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery)
if err != nil {
return fmt.Errorf("error deleting from %s: %w", table, err)
return fmt.Errorf("error preparing batch for deleting logs: %w", err)
}

for _, log := range logsToDelete {
err := batch.Append(
chainId,
chainId,
log.BlockNumber,
log.TransactionHash,
log.LogIndex,
)
if err != nil {
return fmt.Errorf("error appending log to delete batch: %w", err)
}
}
if err := batch.Send(); err != nil {
return fmt.Errorf("error deleting logs: %w", err)
}
return nil
}

func (c *ClickHouseConnector) deleteTransactionsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
blockNumbersStr := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumbersStr[i] = bn.String()
}
getQuery := fmt.Sprintf("SELECT block_number, hash FROM %s.transactions WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String())

rows, getErr := c.conn.Query(context.Background(), getQuery)
if getErr != nil {
return getErr
}
defer rows.Close()

txsToDelete := make([]common.Transaction, 0)
for rows.Next() {
var txToDelete common.Transaction
err := rows.ScanStruct(&txToDelete)
if err != nil {
return err
}
txsToDelete = append(txsToDelete, txToDelete)
}

deleteQuery := fmt.Sprintf("DELETE FROM %s.transactions WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND hash = ?", c.cfg.Database)

batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery)
if err != nil {
return fmt.Errorf("error preparing batch for deleting transactions: %w", err)
}

for _, tx := range txsToDelete {
err := batch.Append(
chainId,
chainId,
tx.BlockNumber,
tx.Hash,
)
if err != nil {
return fmt.Errorf("error appending transaction to delete batch: %w", err)
}
}
if err := batch.Send(); err != nil {
return fmt.Errorf("error deleting transactions: %w", err)
}
return nil
}

func (c *ClickHouseConnector) deleteTracesByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
blockNumbersStr := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumbersStr[i] = bn.String()
}
getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, trace_address FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String())

rows, getErr := c.conn.Query(context.Background(), getQuery)
if getErr != nil {
return getErr
}
defer rows.Close()

tracesToDelete := make([]common.Trace, 0)
for rows.Next() {
var traceToDelete common.Trace
err := rows.ScanStruct(&traceToDelete)
if err != nil {
return err
}
tracesToDelete = append(tracesToDelete, traceToDelete)
}

deleteQuery := fmt.Sprintf("DELETE FROM %s.traces WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND transaction_hash = ? AND trace_address = ?", c.cfg.Database)

batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery)
if err != nil {
return fmt.Errorf("error preparing batch for deleting traces: %w", err)
}

for _, trace := range tracesToDelete {
err := batch.Append(
chainId,
chainId,
trace.BlockNumber,
trace.TransactionHash,
trace.TraceAddress,
)
if err != nil {
return fmt.Errorf("error appending trace to delete batch: %w", err)
}
}
if err := batch.Send(); err != nil {
return fmt.Errorf("error deleting traces: %w", err)
}
return nil
}

Expand Down

0 comments on commit e5a19e4

Please sign in to comment.