Skip to content

Commit

Permalink
Merge pull request #6 from gnosischain/fix_ip_metadata_v2
Browse files Browse the repository at this point in the history
Fix ip metadata v2
  • Loading branch information
riccardo-gnosis authored Oct 1, 2024
2 parents dab13f1 + 00d9e73 commit 332cdac
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 215 deletions.
33 changes: 14 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,42 +1,37 @@
ARG GO_VERSION=1.22.5
FROM golang:1.22.5-bookworm as builder

# Stage 1: Dependency management and build
FROM golang:${GO_VERSION}-bookworm as builder
# Install CA certificates and build essentials
RUN apt-get update && apt-get install -y ca-certificates build-essential && rm -rf /var/lib/apt/lists/*

# Set the working directory inside the container to /app
WORKDIR /app

# Copy go.mod and go.sum files
# Copy the Go module files
COPY go.mod go.sum ./

# Download dependencies and verify modules
RUN go mod download && go mod verify
# Download dependencies
RUN go mod download

# Copy the rest of the application source code
COPY . .

# Run go mod tidy to ensure the go.mod file is up to date
RUN go mod tidy
# Build the application; output the binary to a known location
RUN go build -v -o /run-app .

# Build the application and capture the output
RUN go build -v -o /run-app .

# Stage 2: Final stage
# Final stage based on Debian Bookworm-slim
FROM debian:bookworm-slim

# Install CA certificates in the final image
# Install CA certificates in the final image to ensure they are present.
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

# Copy the built executable from the builder stage
COPY --from=builder /run-app /usr/local/bin/run-app

# Create necessary directory
# Create a directory for the data
RUN mkdir -p /app/data

# Copy the CSV file to /app/data
COPY /data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the working directory
WORKDIR /app
# Copy the CSV file
COPY data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the command to run the application
CMD ["/usr/local/bin/run-app"]
96 changes: 50 additions & 46 deletions clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *ClickhouseClient) LoadIPMetadataFromCSV() error {
reader := csv.NewReader(file)
reader.FieldsPerRecord = -1 // Allow variable number of fields

batch, err := c.chConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata")
batch, err := c.ChConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata")
if err != nil {
return fmt.Errorf("failed to prepare batch: %w", err)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func parseFloat(s string) (float64, error) {
func (c *ClickhouseClient) isTableEmpty(tableName string) (bool, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s", tableName)
var count uint64
err := c.chConn.QueryRow(context.Background(), query).Scan(&count)
err := c.ChConn.QueryRow(context.Background(), query).Scan(&count)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func IPMetadataDDL(db string) string {
asn String,
asn_organization String,
asn_type String
) ENGINE = MergeTree()
) ENGINE = ReplacingMergeTree()
ORDER BY ip;
`, db)
}
Expand Down Expand Up @@ -267,74 +267,78 @@ type ClickhouseConfig struct {
}

type ClickhouseClient struct {
cfg *ClickhouseConfig
log zerolog.Logger

chConn driver.Conn

ValidatorEventChan chan *types.ValidatorEvent
IPMetadataEventChan chan *types.IPMetadataEvent
PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent
MetadataReceivedEventChan chan *types.MetadataReceivedEvent
cfg *ClickhouseConfig
log zerolog.Logger
ChConn clickhouse.Conn

ValidatorEventChan chan *types.ValidatorEvent
IPMetadataEventChan chan *types.IPMetadataEvent
PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent
MetadataReceivedEventChan chan *types.MetadataReceivedEvent
}


func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) {
log := log.NewLogger("clickhouse")

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{cfg.Endpoint},
DialTimeout: time.Second * 60,
Auth: clickhouse.Auth{
Database: cfg.DB,
Username: cfg.Username,
Password: cfg.Password,
},
Debugf: func(format string, v ...interface{}) {
log.Debug().Str("module", "clickhouse").Msgf(format, v)
},
Protocol: clickhouse.Native,
TLS: &tls.Config{},
})
log := log.NewLogger("clickhouse")

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{cfg.Endpoint},
DialTimeout: time.Second * 60,
Auth: clickhouse.Auth{
Database: cfg.DB,
Username: cfg.Username,
Password: cfg.Password,
},
Debugf: func(format string, v ...interface{}) {
log.Debug().Str("module", "clickhouse").Msgf(format, v)
},
Protocol: clickhouse.Native,
TLS: &tls.Config{},
})

if err != nil {
return nil, err
}
if err != nil {
return nil, err
}

return &ClickhouseClient{
cfg: cfg,
log: log,
ChConn: conn,

ValidatorEventChan: make(chan *types.ValidatorEvent, 16384),
IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384),
PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384),
MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384),
}, nil
}

return &ClickhouseClient{
cfg: cfg,
log: log,
chConn: conn,

ValidatorEventChan: make(chan *types.ValidatorEvent, 16384),
IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384),
PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384),
MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384),
}, nil
func (c *ClickhouseClient) Close() error {
return c.ChConn.Close()
}

func (c *ClickhouseClient) initializeTables() error {
// Create validator_metadata table
if err := c.chConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating validator_metadata table")
return err
}

// Create ip_metadata table
if err := c.chConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating ip_metadata table")
return err
}


// Create peer_discovered_events table
if err := c.chConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating peer_discovered_events table")
return err
}

// Create metadata_received_events table
if err := c.chConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating metadata_received_events table")
return err
}
Expand Down Expand Up @@ -368,7 +372,7 @@ func (c *ClickhouseClient) Start() error {
// BatchProcessor processes events in batches for a specified table in ClickHouse.
func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan <-chan T, maxSize uint64) {
// Prepare the initial batch.
batch, err := client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
batch, err := client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
if err != nil {
client.log.Error().Err(err).Msg("Failed to prepare batch")
return
Expand Down Expand Up @@ -396,7 +400,7 @@ func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan
}

// Prepare a new batch after sending the current batch.
batch, err = client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
batch, err = client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
if err != nil {
client.log.Error().Err(err).Msg("Failed to prepare new batch after sending")
return
Expand Down
Loading

0 comments on commit 332cdac

Please sign in to comment.