diff --git a/src/bin/main.rs b/src/bin/main.rs index 5986b0a..72e88bd 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -98,12 +98,7 @@ async fn run() -> anyhow::Result<()> { .collect() }; - let assignment = assign_partitions( - &consumer, - &["gateway_queries", "gateway_indexer_fees"], - start_timestamp, - ) - .await?; + let assignment = assign_partitions(&consumer, &["gateway_queries"], start_timestamp).await?; let (source_msg_tx, mut source_msg_rx) = mpsc::channel::(1024); let mut partition_consumers: Vec> = assignment @@ -180,18 +175,6 @@ async fn handle_source_msg( tracing::info!(timestamp = print_unix_millis(t).unwrap(), "flushed"); } } - SourceMsg::IndexerFees { - aggregation_timestamp, - signer, - receiver, - fees_grt, - } => { - if aggregation_timestamp >= start_timestamp { - let key = IndexerFeesKey { signer, receiver }; - let agg = aggregations.entry(aggregation_timestamp).or_default(); - *agg.indexer_fees.entry(key).or_default() += fees_grt; - } - } SourceMsg::ClientQuery { timestamp, aggregation_timestamp, @@ -266,11 +249,7 @@ fn spawn_partition_consumer( let msg = SourceMsg::decode(msg, &legacy_source_offsets)?; let aggregation_timestamp = match &msg { SourceMsg::Flush { .. } => unreachable!(), // unreachable - SourceMsg::IndexerFees { - aggregation_timestamp, - .. - } - | SourceMsg::ClientQuery { + SourceMsg::ClientQuery { aggregation_timestamp, .. } => *aggregation_timestamp, @@ -366,13 +345,6 @@ enum SourceMsg { partition_id: String, aggregation_timestamp: i64, }, - // TODO: remove after migration - IndexerFees { - aggregation_timestamp: i64, - signer: Address, - receiver: Address, - fees_grt: f64, - }, ClientQuery { timestamp: i64, aggregation_timestamp: i64, @@ -404,15 +376,6 @@ impl SourceMsg { data: decoded, }) } - "gateway_indexer_fees" => { - let decoded = IndexerFeesProtobuf::decode(payload).context("decode protobuf")?; - Ok(SourceMsg::IndexerFees { - aggregation_timestamp, - signer: Address::from_slice(&decoded.signer)?, - receiver: Address::from_slice(&decoded.receiver)?, - fees_grt: decoded.fees_grt, - }) - } topic => anyhow::bail!("unexpected topic: {topic}"), } }