From 6278089942b280262255b86a9ad49d000bef706e Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 11 Nov 2024 09:37:41 -0500 Subject: [PATCH] fix: handle all aggregation sink timestamps --- src/bin/main.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index a85c0e1..fbf3465 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -240,8 +240,16 @@ async fn latest_sink_timestamp(consumer: &StreamConsumer) -> anyhow::Result anyhow::Result { - let msg = IndexerFeesHourlyProtobuf::decode(msg.payload().context("missing payload")?)?; - Ok(msg.timestamp) + let payload = msg.payload().context("missing_payload")?; + match msg.topic() { + "gateway_client_fees_hourly" => { + Ok(ClientFeesHourlyProtobuf::decode(payload)?.timestamp) + } + "gateway_indexer_fees_hourly" => { + Ok(IndexerFeesHourlyProtobuf::decode(payload)?.timestamp) + } + topic => anyhow::bail!("unhandled topic: {topic}"), + } }) .collect::>>()? .into_iter()