Skip to content

Commit

Permalink
feat!: add limit to receipts retrieve
Browse files Browse the repository at this point in the history
Helps with keeping the rule of 15,000 max receipts per aggregation
request.
Also adds a helper function to implement the limit safely.

Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
  • Loading branch information
aasseman committed Nov 15, 2023
1 parent 3d247c0 commit d25b615
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 15 deletions.
2 changes: 1 addition & 1 deletion tap_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ alloy-primitives = { version = "0.4.2", features = ["serde"]}
strum = "0.24.1"
strum_macros = "0.24.3"
async-trait = "0.1.72"
tokio = { version = "1.29.1", features = ["macros"] }
tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_std"] }
Expand Down
55 changes: 55 additions & 0 deletions tap_core/src/adapters/receipt_storage_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,26 @@ pub trait ReceiptStorageAdapter {
/// This method should be implemented to fetch all `ReceivedReceipts` within a specific timestamp range
/// from your storage system. The returned receipts should be in the form of a vector of tuples where
/// each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`.
///
/// If a limit is specified, the adapter should return at most that many receipts, while making
/// sure that that no receipts are left behind for any timestamp that is returned. Examples:
///
/// - If the adapter has 10 receipts for timestamp 100, and 5 receipts for timestamp 200, and
/// the limit is 10, the adapter should return all 10 receipts for timestamp 100, and none for
/// timestamp 200.
/// - If the adapter has 5 receipts for timestamp 100, and 10 receipts for timestamp 200, and
/// the limit is 10, the adapter should return all 5 receipts for timestamp 100, and none for
/// timestamp 200. (because it would have to leave behind 5 receipts for timestamp 200, which
/// is not allowed).
///
/// You can use the [`safe_truncate_receipts()`] function to help with this, but feel free to
/// implement a more efficient solution for your situation if you can.
///
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
&self,
timestamp_range_ns: R,
limit: Option<u64>,
) -> Result<Vec<(u64, ReceivedReceipt)>, Self::AdapterError>;

/// Updates a specific `ReceivedReceipt` identified by a unique receipt_id.
Expand All @@ -84,3 +100,42 @@ pub trait ReceiptStorageAdapter {
timestamp_ns: R,
) -> Result<(), Self::AdapterError>;
}

/// See [`ReceiptStorageAdapter::retrieve_receipts_in_timestamp_range()`] for details.
///
/// WARNING: Will sort the receipts by timestamp using
/// [vec::sort_unstable](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.sort_unstable).
pub fn safe_truncate_receipts(receipts: &mut Vec<(u64, ReceivedReceipt)>, limit: u64) {
if receipts.len() <= limit as usize {
return;
} else if limit == 0 {
receipts.clear();
return;
}

receipts.sort_unstable_by_key(|(_, rx_receipt)| rx_receipt.signed_receipt.message.timestamp_ns);

// This one will be the last timestamp in `receipts` after naive truncation
let last_timestamp = receipts[limit as usize - 1]
.1
.signed_receipt
.message
.timestamp_ns;
// This one is the timestamp that comes just after the one above
let after_last_timestamp = receipts[limit as usize]
.1
.signed_receipt
.message
.timestamp_ns;

receipts.truncate(limit as usize);

if last_timestamp == after_last_timestamp {
// If the last timestamp is the same as the one that came after it, we need to
// remove all the receipts with the same timestamp as the last one, because
// otherwise we would leave behind part of the receipts for that timestamp.
receipts.retain(|(_, rx_receipt)| {
rx_receipt.signed_receipt.message.timestamp_ns != last_timestamp
});
}
}
18 changes: 14 additions & 4 deletions tap_core/src/adapters/test/receipt_storage_adapter_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use async_trait::async_trait;
use tokio::sync::RwLock;

use crate::{
adapters::receipt_storage_adapter::ReceiptStorageAdapter, tap_receipt::ReceivedReceipt,
adapters::receipt_storage_adapter::{safe_truncate_receipts, ReceiptStorageAdapter},
tap_receipt::ReceivedReceipt,
};

pub struct ReceiptStorageAdapterMock {
Expand Down Expand Up @@ -52,7 +53,7 @@ impl ReceiptStorageAdapterMock {
&self,
timestamp_ns: u64,
) -> Result<Vec<(u64, ReceivedReceipt)>, AdapterErrorMock> {
self.retrieve_receipts_in_timestamp_range(..=timestamp_ns)
self.retrieve_receipts_in_timestamp_range(..=timestamp_ns, None)
.await
}
pub async fn remove_receipt_by_id(&mut self, receipt_id: u64) -> Result<(), AdapterErrorMock> {
Expand Down Expand Up @@ -96,15 +97,24 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock {
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
&self,
timestamp_range_ns: R,
limit: Option<u64>,
) -> Result<Vec<(u64, ReceivedReceipt)>, Self::AdapterError> {
let receipt_storage = self.receipt_storage.read().await;
Ok(receipt_storage
let mut receipts_in_range: Vec<(u64, ReceivedReceipt)> = receipt_storage
.iter()
.filter(|(_, rx_receipt)| {
timestamp_range_ns.contains(&rx_receipt.signed_receipt.message.timestamp_ns)
})
.map(|(&id, rx_receipt)| (id, rx_receipt.clone()))
.collect())
.collect();

if limit.is_some_and(|limit| receipts_in_range.len() > limit as usize) {
safe_truncate_receipts(&mut receipts_in_range, limit.unwrap());

Ok(receipts_in_range)
} else {
Ok(receipts_in_range)
}
}
async fn update_receipt_by_id(
&self,
Expand Down
70 changes: 70 additions & 0 deletions tap_core/src/adapters/test/receipt_storage_adapter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#[cfg(test)]
mod receipt_storage_adapter_unit_test {
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -179,4 +181,72 @@ mod receipt_storage_adapter_unit_test {
.is_err());
}
}

/// The test code will shuffle the input timestamps prior to calling safe_truncate_receipts.
#[rstest]
#[case(vec![1, 2, 3, 4, 5], 3, vec![1, 2, 3])]
#[case(vec![1, 2, 3, 3, 4, 5], 3, vec![1, 2])]
#[case(vec![1, 2, 3, 4, 4, 4], 3, vec![1, 2, 3])]
#[case(vec![1, 1, 1, 1, 2, 3], 3, vec![])]
#[tokio::test]
async fn safe_truncate_receipts_test(
domain_separator: Eip712Domain,
#[case] input: Vec<u64>,
#[case] limit: u64,
#[case] expected: Vec<u64>,
) {
let wallet: LocalWallet = MnemonicBuilder::<English>::default()
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
.build()
.unwrap();

// Vec of (id, receipt)
let mut receipts_orig: Vec<(u64, ReceivedReceipt)> = Vec::new();

for (i, timestamp) in input.iter().enumerate() {
// The contents of the receipt only need to be unique for this test (so we can check)
receipts_orig.push((
i as u64,
ReceivedReceipt::new(
EIP712SignedMessage::new(
&domain_separator,
Receipt {
allocation_id: Address::ZERO,
timestamp_ns: *timestamp,
nonce: 0,
value: 0,
},
&wallet,
)
.await
.unwrap(),
i as u64, // Will use that to check the IDs
&get_full_list_of_checks(),
),
));
}

let mut receipts_truncated = receipts_orig.clone();

// shuffle the input receipts
receipts_truncated.shuffle(&mut thread_rng());

crate::adapters::receipt_storage_adapter::safe_truncate_receipts(
&mut receipts_truncated,
limit,
);

assert_eq!(receipts_truncated.len(), expected.len());

for (elem_trun, expected_timestamp) in receipts_truncated.iter().zip(expected.iter()) {
// Check timestamps
assert_eq!(
elem_trun.1.signed_receipt.message.timestamp_ns,
*expected_timestamp
);

// Check that the IDs are fine
assert_eq!(elem_trun.0, elem_trun.1.query_id);
}
}
}
11 changes: 8 additions & 3 deletions tap_core/src/tap_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,19 @@ impl<
///
/// Returns [`Error::TimestampRangeError`] if the max timestamp of the previous RAV is greater than the min timestamp. Caused by timestamp buffer being too large, or requests coming too soon.
///
pub async fn create_rav_request(&self, timestamp_buffer_ns: u64) -> Result<RAVRequest, Error> {
pub async fn create_rav_request(
&self,
timestamp_buffer_ns: u64,
receipts_limit: Option<u64>,
) -> Result<RAVRequest, Error> {
let previous_rav = self.get_previous_rav().await?;
let min_timestamp_ns = previous_rav
.as_ref()
.map(|rav| rav.message.timestamp_ns + 1)
.unwrap_or(0);

let (valid_receipts, invalid_receipts) = self
.collect_receipts(timestamp_buffer_ns, min_timestamp_ns)
.collect_receipts(timestamp_buffer_ns, min_timestamp_ns, receipts_limit)
.await?;

let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone())?;
Expand Down Expand Up @@ -216,6 +220,7 @@ impl<
&self,
timestamp_buffer_ns: u64,
min_timestamp_ns: u64,
limit: Option<u64>,
) -> Result<(Vec<SignedReceipt>, Vec<SignedReceipt>), Error> {
let max_timestamp_ns = crate::get_current_timestamp_u64_ns()? - timestamp_buffer_ns;

Expand All @@ -227,7 +232,7 @@ impl<
}
let received_receipts = self
.receipt_storage_adapter
.retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns)
.retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns, limit)
.await
.map_err(|err| Error::AdapterError {
source_error: anyhow::Error::new(err),
Expand Down
12 changes: 6 additions & 6 deletions tap_core/src/tap_manager/test/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ mod manager_unit_test {
.await
.is_ok());
}
let rav_request_result = manager.create_rav_request(0).await;
let rav_request_result = manager.create_rav_request(0, None).await;
assert!(rav_request_result.is_ok());

let rav_request = rav_request_result.unwrap();
Expand Down Expand Up @@ -303,7 +303,7 @@ mod manager_unit_test {
.is_ok());
expected_accumulated_value += value;
}
let rav_request_result = manager.create_rav_request(0).await;
let rav_request_result = manager.create_rav_request(0, None).await;
assert!(rav_request_result.is_ok());

let rav_request = rav_request_result.unwrap();
Expand Down Expand Up @@ -352,7 +352,7 @@ mod manager_unit_test {
.is_ok());
expected_accumulated_value += value;
}
let rav_request_result = manager.create_rav_request(0).await;
let rav_request_result = manager.create_rav_request(0, None).await;
assert!(rav_request_result.is_ok());

let rav_request = rav_request_result.unwrap();
Expand Down Expand Up @@ -443,7 +443,7 @@ mod manager_unit_test {
manager.remove_obsolete_receipts().await.unwrap();
}

let rav_request_1_result = manager.create_rav_request(0).await;
let rav_request_1_result = manager.create_rav_request(0, None).await;
assert!(rav_request_1_result.is_ok());

let rav_request_1 = rav_request_1_result.unwrap();
Expand Down Expand Up @@ -501,15 +501,15 @@ mod manager_unit_test {
assert_eq!(
manager
.receipt_storage_adapter
.retrieve_receipts_in_timestamp_range(..)
.retrieve_receipts_in_timestamp_range(.., None)
.await
.unwrap()
.len(),
10
);
}

let rav_request_2_result = manager.create_rav_request(0).await;
let rav_request_2_result = manager.create_rav_request(0, None).await;
assert!(rav_request_2_result.is_ok());

let rav_request_2 = rav_request_2_result.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tap_integration_tests/tests/indexer_mock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async fn request_rav<
threshold: usize,
) -> Result<()> {
// Create the aggregate_receipts request params
let rav_request = manager.create_rav_request(time_stamp_buffer).await?;
let rav_request = manager.create_rav_request(time_stamp_buffer, None).await?;

// To-do: Need to add previous RAV, when tap_manager supports replacing receipts
let params = rpc_params!(
Expand Down

0 comments on commit d25b615

Please sign in to comment.