From d25b6157f76e6e445f3beef64d8072b54736bcca Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Wed, 15 Nov 2023 12:15:40 -0800 Subject: [PATCH] feat!: add limit to receipts retrieve Helps with keeping the rule of 15,000 max receipts per aggregation request. Also adds a helper function to implement the limit safely. Signed-off-by: Alexis Asseman --- tap_core/Cargo.toml | 2 +- .../src/adapters/receipt_storage_adapter.rs | 55 +++++++++++++++ .../test/receipt_storage_adapter_mock.rs | 18 +++-- .../test/receipt_storage_adapter_test.rs | 70 +++++++++++++++++++ tap_core/src/tap_manager/manager.rs | 11 ++- tap_core/src/tap_manager/test/manager_test.rs | 12 ++-- .../tests/indexer_mock/mod.rs | 2 +- 7 files changed, 155 insertions(+), 15 deletions(-) diff --git a/tap_core/Cargo.toml b/tap_core/Cargo.toml index d9f2b5e8..e502af21 100644 --- a/tap_core/Cargo.toml +++ b/tap_core/Cargo.toml @@ -23,7 +23,7 @@ alloy-primitives = { version = "0.4.2", features = ["serde"]} strum = "0.24.1" strum_macros = "0.24.3" async-trait = "0.1.72" -tokio = { version = "1.29.1", features = ["macros"] } +tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] } [dev-dependencies] criterion = { version = "0.5", features = ["async_std"] } diff --git a/tap_core/src/adapters/receipt_storage_adapter.rs b/tap_core/src/adapters/receipt_storage_adapter.rs index 1e848434..a76ff9ce 100644 --- a/tap_core/src/adapters/receipt_storage_adapter.rs +++ b/tap_core/src/adapters/receipt_storage_adapter.rs @@ -57,10 +57,26 @@ pub trait ReceiptStorageAdapter { /// This method should be implemented to fetch all `ReceivedReceipts` within a specific timestamp range /// from your storage system. The returned receipts should be in the form of a vector of tuples where /// each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`. + /// + /// If a limit is specified, the adapter should return at most that many receipts, while making + /// sure that that no receipts are left behind for any timestamp that is returned. Examples: + /// + /// - If the adapter has 10 receipts for timestamp 100, and 5 receipts for timestamp 200, and + /// the limit is 10, the adapter should return all 10 receipts for timestamp 100, and none for + /// timestamp 200. + /// - If the adapter has 5 receipts for timestamp 100, and 10 receipts for timestamp 200, and + /// the limit is 10, the adapter should return all 5 receipts for timestamp 100, and none for + /// timestamp 200. (because it would have to leave behind 5 receipts for timestamp 200, which + /// is not allowed). + /// + /// You can use the [`safe_truncate_receipts()`] function to help with this, but feel free to + /// implement a more efficient solution for your situation if you can. + /// /// Any errors that occur during this process should be captured and returned as an `AdapterError`. async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( &self, timestamp_range_ns: R, + limit: Option, ) -> Result, Self::AdapterError>; /// Updates a specific `ReceivedReceipt` identified by a unique receipt_id. @@ -84,3 +100,42 @@ pub trait ReceiptStorageAdapter { timestamp_ns: R, ) -> Result<(), Self::AdapterError>; } + +/// See [`ReceiptStorageAdapter::retrieve_receipts_in_timestamp_range()`] for details. +/// +/// WARNING: Will sort the receipts by timestamp using +/// [vec::sort_unstable](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.sort_unstable). +pub fn safe_truncate_receipts(receipts: &mut Vec<(u64, ReceivedReceipt)>, limit: u64) { + if receipts.len() <= limit as usize { + return; + } else if limit == 0 { + receipts.clear(); + return; + } + + receipts.sort_unstable_by_key(|(_, rx_receipt)| rx_receipt.signed_receipt.message.timestamp_ns); + + // This one will be the last timestamp in `receipts` after naive truncation + let last_timestamp = receipts[limit as usize - 1] + .1 + .signed_receipt + .message + .timestamp_ns; + // This one is the timestamp that comes just after the one above + let after_last_timestamp = receipts[limit as usize] + .1 + .signed_receipt + .message + .timestamp_ns; + + receipts.truncate(limit as usize); + + if last_timestamp == after_last_timestamp { + // If the last timestamp is the same as the one that came after it, we need to + // remove all the receipts with the same timestamp as the last one, because + // otherwise we would leave behind part of the receipts for that timestamp. + receipts.retain(|(_, rx_receipt)| { + rx_receipt.signed_receipt.message.timestamp_ns != last_timestamp + }); + } +} diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs b/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs index d4fc033a..d48943f1 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs @@ -7,7 +7,8 @@ use async_trait::async_trait; use tokio::sync::RwLock; use crate::{ - adapters::receipt_storage_adapter::ReceiptStorageAdapter, tap_receipt::ReceivedReceipt, + adapters::receipt_storage_adapter::{safe_truncate_receipts, ReceiptStorageAdapter}, + tap_receipt::ReceivedReceipt, }; pub struct ReceiptStorageAdapterMock { @@ -52,7 +53,7 @@ impl ReceiptStorageAdapterMock { &self, timestamp_ns: u64, ) -> Result, AdapterErrorMock> { - self.retrieve_receipts_in_timestamp_range(..=timestamp_ns) + self.retrieve_receipts_in_timestamp_range(..=timestamp_ns, None) .await } pub async fn remove_receipt_by_id(&mut self, receipt_id: u64) -> Result<(), AdapterErrorMock> { @@ -96,15 +97,24 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( &self, timestamp_range_ns: R, + limit: Option, ) -> Result, Self::AdapterError> { let receipt_storage = self.receipt_storage.read().await; - Ok(receipt_storage + let mut receipts_in_range: Vec<(u64, ReceivedReceipt)> = receipt_storage .iter() .filter(|(_, rx_receipt)| { timestamp_range_ns.contains(&rx_receipt.signed_receipt.message.timestamp_ns) }) .map(|(&id, rx_receipt)| (id, rx_receipt.clone())) - .collect()) + .collect(); + + if limit.is_some_and(|limit| receipts_in_range.len() > limit as usize) { + safe_truncate_receipts(&mut receipts_in_range, limit.unwrap()); + + Ok(receipts_in_range) + } else { + Ok(receipts_in_range) + } } async fn update_receipt_by_id( &self, diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs index 68c23e0d..0eb36355 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs @@ -3,6 +3,8 @@ #[cfg(test)] mod receipt_storage_adapter_unit_test { + use rand::seq::SliceRandom; + use rand::thread_rng; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -179,4 +181,72 @@ mod receipt_storage_adapter_unit_test { .is_err()); } } + + /// The test code will shuffle the input timestamps prior to calling safe_truncate_receipts. + #[rstest] + #[case(vec![1, 2, 3, 4, 5], 3, vec![1, 2, 3])] + #[case(vec![1, 2, 3, 3, 4, 5], 3, vec![1, 2])] + #[case(vec![1, 2, 3, 4, 4, 4], 3, vec![1, 2, 3])] + #[case(vec![1, 1, 1, 1, 2, 3], 3, vec![])] + #[tokio::test] + async fn safe_truncate_receipts_test( + domain_separator: Eip712Domain, + #[case] input: Vec, + #[case] limit: u64, + #[case] expected: Vec, + ) { + let wallet: LocalWallet = MnemonicBuilder::::default() + .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") + .build() + .unwrap(); + + // Vec of (id, receipt) + let mut receipts_orig: Vec<(u64, ReceivedReceipt)> = Vec::new(); + + for (i, timestamp) in input.iter().enumerate() { + // The contents of the receipt only need to be unique for this test (so we can check) + receipts_orig.push(( + i as u64, + ReceivedReceipt::new( + EIP712SignedMessage::new( + &domain_separator, + Receipt { + allocation_id: Address::ZERO, + timestamp_ns: *timestamp, + nonce: 0, + value: 0, + }, + &wallet, + ) + .await + .unwrap(), + i as u64, // Will use that to check the IDs + &get_full_list_of_checks(), + ), + )); + } + + let mut receipts_truncated = receipts_orig.clone(); + + // shuffle the input receipts + receipts_truncated.shuffle(&mut thread_rng()); + + crate::adapters::receipt_storage_adapter::safe_truncate_receipts( + &mut receipts_truncated, + limit, + ); + + assert_eq!(receipts_truncated.len(), expected.len()); + + for (elem_trun, expected_timestamp) in receipts_truncated.iter().zip(expected.iter()) { + // Check timestamps + assert_eq!( + elem_trun.1.signed_receipt.message.timestamp_ns, + *expected_timestamp + ); + + // Check that the IDs are fine + assert_eq!(elem_trun.0, elem_trun.1.query_id); + } + } } diff --git a/tap_core/src/tap_manager/manager.rs b/tap_core/src/tap_manager/manager.rs index dd984a75..8ecaa2a5 100644 --- a/tap_core/src/tap_manager/manager.rs +++ b/tap_core/src/tap_manager/manager.rs @@ -176,7 +176,11 @@ impl< /// /// Returns [`Error::TimestampRangeError`] if the max timestamp of the previous RAV is greater than the min timestamp. Caused by timestamp buffer being too large, or requests coming too soon. /// - pub async fn create_rav_request(&self, timestamp_buffer_ns: u64) -> Result { + pub async fn create_rav_request( + &self, + timestamp_buffer_ns: u64, + receipts_limit: Option, + ) -> Result { let previous_rav = self.get_previous_rav().await?; let min_timestamp_ns = previous_rav .as_ref() @@ -184,7 +188,7 @@ impl< .unwrap_or(0); let (valid_receipts, invalid_receipts) = self - .collect_receipts(timestamp_buffer_ns, min_timestamp_ns) + .collect_receipts(timestamp_buffer_ns, min_timestamp_ns, receipts_limit) .await?; let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone())?; @@ -216,6 +220,7 @@ impl< &self, timestamp_buffer_ns: u64, min_timestamp_ns: u64, + limit: Option, ) -> Result<(Vec, Vec), Error> { let max_timestamp_ns = crate::get_current_timestamp_u64_ns()? - timestamp_buffer_ns; @@ -227,7 +232,7 @@ impl< } let received_receipts = self .receipt_storage_adapter - .retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns) + .retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns, limit) .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), diff --git a/tap_core/src/tap_manager/test/manager_test.rs b/tap_core/src/tap_manager/test/manager_test.rs index 857a891a..2009a1af 100644 --- a/tap_core/src/tap_manager/test/manager_test.rs +++ b/tap_core/src/tap_manager/test/manager_test.rs @@ -223,7 +223,7 @@ mod manager_unit_test { .await .is_ok()); } - let rav_request_result = manager.create_rav_request(0).await; + let rav_request_result = manager.create_rav_request(0, None).await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -303,7 +303,7 @@ mod manager_unit_test { .is_ok()); expected_accumulated_value += value; } - let rav_request_result = manager.create_rav_request(0).await; + let rav_request_result = manager.create_rav_request(0, None).await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -352,7 +352,7 @@ mod manager_unit_test { .is_ok()); expected_accumulated_value += value; } - let rav_request_result = manager.create_rav_request(0).await; + let rav_request_result = manager.create_rav_request(0, None).await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -443,7 +443,7 @@ mod manager_unit_test { manager.remove_obsolete_receipts().await.unwrap(); } - let rav_request_1_result = manager.create_rav_request(0).await; + let rav_request_1_result = manager.create_rav_request(0, None).await; assert!(rav_request_1_result.is_ok()); let rav_request_1 = rav_request_1_result.unwrap(); @@ -501,7 +501,7 @@ mod manager_unit_test { assert_eq!( manager .receipt_storage_adapter - .retrieve_receipts_in_timestamp_range(..) + .retrieve_receipts_in_timestamp_range(.., None) .await .unwrap() .len(), @@ -509,7 +509,7 @@ mod manager_unit_test { ); } - let rav_request_2_result = manager.create_rav_request(0).await; + let rav_request_2_result = manager.create_rav_request(0, None).await; assert!(rav_request_2_result.is_ok()); let rav_request_2 = rav_request_2_result.unwrap(); diff --git a/tap_integration_tests/tests/indexer_mock/mod.rs b/tap_integration_tests/tests/indexer_mock/mod.rs index 7037ea50..dbf88ccc 100644 --- a/tap_integration_tests/tests/indexer_mock/mod.rs +++ b/tap_integration_tests/tests/indexer_mock/mod.rs @@ -218,7 +218,7 @@ async fn request_rav< threshold: usize, ) -> Result<()> { // Create the aggregate_receipts request params - let rav_request = manager.create_rav_request(time_stamp_buffer).await?; + let rav_request = manager.create_rav_request(time_stamp_buffer, None).await?; // To-do: Need to add previous RAV, when tap_manager supports replacing receipts let params = rpc_params!(