Skip to content

Commit

Permalink
Merge pull request #288 from wadeking98/fix-binary-react-native
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewwhitehead authored May 28, 2024
2 parents 9d43f97 + 203c122 commit bcf9070
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 127 deletions.
2 changes: 0 additions & 2 deletions libindy_vdr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ thiserror = "1.0"
time = { version = "=0.3.20", features = ["parsing"] }
url = "2.2.2"
zmq = "0.9"
async-trait = "0.1.77"
async-lock = "3.3.0"
sled = "0.34.7"

[dev-dependencies]
Expand Down
39 changes: 22 additions & 17 deletions libindy_vdr/src/pool/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use async_lock::RwLock;
use async_trait::async_trait;
use std::{fmt::Display, sync::Arc};
use std::{
fmt::Display,
sync::{Arc, RwLock},
};

pub mod storage;
pub mod strategy;

#[async_trait]
pub trait CacheStrategy<K, V>: Send + Sync + 'static {
async fn get(&self, key: &K) -> Option<V>;
fn get(&self, key: &K) -> Option<V>;

async fn remove(&mut self, key: &K) -> Option<V>;
fn remove(&self, key: &K) -> Option<V>;

async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V>;
fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V>;
}

pub struct Cache<K: Display, V> {
Expand All @@ -34,23 +34,28 @@ impl<K: Display + 'static, V: 'static> Cache<K, V> {
}
}

pub async fn get(&self, key: &K) -> Option<V> {
pub fn get(&self, key: &K) -> Option<V> {
let full_key = self.full_key(key);
self.storage.read().await.get(&full_key).await
if let Ok(storage) = self.storage.read() {
return storage.get(&full_key);
}
None
}

pub async fn remove(&self, key: &K) -> Option<V> {
pub fn remove(&self, key: &K) -> Option<V> {
let full_key = self.full_key(key);
self.storage.write().await.remove(&full_key).await
if let Ok(storage) = self.storage.write() {
return storage.remove(&full_key);
}
None
}

pub async fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
pub fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
let full_key = self.full_key(&key);
self.storage
.write()
.await
.insert(full_key, value, custom_exp_offset)
.await
if let Ok(storage) = self.storage.write() {
return storage.insert(full_key, value, custom_exp_offset);
}
None
}
}

Expand Down
196 changes: 92 additions & 104 deletions libindy_vdr/src/pool/cache/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use super::storage::OrderedHashMap;
use super::CacheStrategy;
use async_lock::Mutex;
use async_trait::async_trait;
use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime};
use std::{
collections::BTreeMap,
fmt::Debug,
hash::Hash,
ops::Deref,
sync::{Arc, Mutex},
time::SystemTime,
};

/// A simple struct to hold a value and the expiry offset
/// needed because items can be inserted with custom ttl values
Expand Down Expand Up @@ -50,89 +55,94 @@ impl<K: Eq + Hash + Clone + Send + Sync + 'static, V: Clone + Send + Sync + 'sta
}
}

#[async_trait]
impl<K: Send + Sync + 'static, V: Send + Sync + 'static> CacheStrategy<K, V>
for Arc<dyn CacheStrategy<K, V>>
{
async fn get(&self, key: &K) -> Option<V> {
self.get(key).await
fn get(&self, key: &K) -> Option<V> {
self.deref().get(key)
}
async fn remove(&mut self, key: &K) -> Option<V> {
self.remove(key).await
fn remove(&self, key: &K) -> Option<V> {
self.deref().remove(key)
}
async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
self.insert(key, value, custom_exp_offset).await
fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
self.deref().insert(key, value, custom_exp_offset)
}
}

#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone + Debug, V: Clone + Send + Sync + 'static>
CacheStrategy<K, V> for CacheStrategyTTL<K, V>
{
async fn get(&self, key: &K) -> Option<V> {
let mut store_lock = self.store.lock().await;
let current_time = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();
let get_res = match store_lock.get(key) {
Some((ts, v)) => {
if current_time < *ts {
Some((*ts, v.clone()))
} else {
store_lock.remove(key);
None
fn get(&self, key: &K) -> Option<V> {
if let Some(mut store_lock) = self.store.lock().ok() {
let current_time = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();
let get_res = match store_lock.get(key) {
Some((ts, v)) => {
if current_time < *ts {
Some((*ts, v.clone()))
} else {
store_lock.remove(key);
None
}
}
None => None,
};
// update the timestamp if the entry is still valid
if let Some((_, ref v)) = get_res {
store_lock.re_order(key, current_time + v.expire_offset);
}
None => None,
};
// update the timestamp if the entry is still valid
if let Some((_, ref v)) = get_res {
store_lock.re_order(key, current_time + v.expire_offset);
return get_res.map(|(_, v)| v.value);
}
get_res.map(|(_, v)| v.value)
None
}
async fn remove(&mut self, key: &K) -> Option<V> {
self.store.lock().await.remove(key).map(|(_, v)| v.value)
fn remove(&self, key: &K) -> Option<V> {
if let Some(mut store) = self.store.lock().ok() {
return store.remove(key).map(|(_, v)| v.value);
}
None
}

async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
let mut store_lock = self.store.lock().await;
let current_ts = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();

// remove expired entries
while store_lock.len() > 0
&& store_lock
.get_first_key_value()
.map(|(_, ts, _)| ts.clone() < current_ts)
.unwrap_or(false)
{
store_lock.remove_first();
}
fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
if let Some(mut store_lock) = self.store.lock().ok() {
let current_ts = SystemTime::now()
.duration_since(self.create_time)
.unwrap()
.as_millis();

// remove the oldest item if the cache is still full
if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() {
// remove the oldest item
let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone());
if let Some(removal_key) = removal_key {
store_lock.remove(&removal_key);
// remove expired entries
while store_lock.len() > 0
&& store_lock
.get_first_key_value()
.map(|(_, ts, _)| ts.clone() < current_ts)
.unwrap_or(false)
{
store_lock.remove_first();
}
};

let exp_offset = custom_exp_offset.unwrap_or(self.expire_after);
store_lock
.insert(
key,
TTLCacheItem {
value: value,
expire_offset: exp_offset,
},
current_ts + exp_offset,
)
.map(|v| v.value)
// remove the oldest item if the cache is still full
if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() {
// remove the oldest item
let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone());
if let Some(removal_key) = removal_key {
store_lock.remove(&removal_key);
}
};

let exp_offset = custom_exp_offset.unwrap_or(self.expire_after);
return store_lock
.insert(
key,
TTLCacheItem {
value: value,
expire_offset: exp_offset,
},
current_ts + exp_offset,
)
.map(|v| v.value);
}
None
}
}

Expand All @@ -158,53 +168,31 @@ mod tests {
let caches = vec![cache, fs_cache];
block_on(async {
for cache in caches {
cache
.insert("key".to_string(), "value".to_string(), None)
.await;
assert_eq!(
cache.get(&"key".to_string()).await,
Some("value".to_string())
);
cache
.insert("key1".to_string(), "value1".to_string(), None)
.await;
cache
.insert("key2".to_string(), "value2".to_string(), None)
.await;
assert_eq!(cache.get(&"key".to_string()).await, None);
cache
.insert("key3".to_string(), "value3".to_string(), None)
.await;
cache.get(&"key2".to_string()).await;
cache
.insert("key4".to_string(), "value4".to_string(), None)
.await;
cache.insert("key".to_string(), "value".to_string(), None);
assert_eq!(cache.get(&"key".to_string()), Some("value".to_string()));
cache.insert("key1".to_string(), "value1".to_string(), None);
cache.insert("key2".to_string(), "value2".to_string(), None);
assert_eq!(cache.get(&"key".to_string()), None);
cache.insert("key3".to_string(), "value3".to_string(), None);
cache.get(&"key2".to_string());
cache.insert("key4".to_string(), "value4".to_string(), None);
// key2 should not be evicted because of LRU
assert_eq!(
cache.remove(&"key2".to_string()).await,
cache.remove(&"key2".to_string()),
Some("value2".to_string())
);
// key3 should be evicted because it was bumped to back after key2 was accessed
assert_eq!(cache.get(&"key3".to_string()).await, None);
cache
.insert("key5".to_string(), "value5".to_string(), None)
.await;
assert_eq!(cache.get(&"key3".to_string()), None);
cache.insert("key5".to_string(), "value5".to_string(), None);
thread::sleep(std::time::Duration::from_millis(6));
assert_eq!(cache.get(&"key5".to_string()).await, None);
assert_eq!(cache.get(&"key5".to_string()), None);
// test ttl config
cache
.insert("key6".to_string(), "value6".to_string(), Some(1))
.await;
cache
.insert("key7".to_string(), "value7".to_string(), None)
.await;
cache.insert("key6".to_string(), "value6".to_string(), Some(1));
cache.insert("key7".to_string(), "value7".to_string(), None);
// wait until value6 expires
thread::sleep(std::time::Duration::from_millis(1));
assert_eq!(cache.get(&"key6".to_string()).await, None);
assert_eq!(
cache.get(&"key7".to_string()).await,
Some("value7".to_string())
);
assert_eq!(cache.get(&"key6".to_string()), None);
assert_eq!(cache.get(&"key7".to_string()), Some("value7".to_string()));
}
std::fs::remove_dir_all(cache_location).unwrap();
});
Expand Down
6 changes: 2 additions & 4 deletions libindy_vdr/src/pool/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub async fn perform_ledger_request<T: Pool>(

if is_read_req {
if let Some(cache) = cache_opt.clone() {
if let Some((response, meta)) = cache.get(&cache_key).await {
if let Some((response, meta)) = cache.get(&cache_key) {
return Ok((RequestResult::Reply(response), meta));
}
}
Expand All @@ -240,9 +240,7 @@ pub async fn perform_ledger_request<T: Pool>(
}
}
if let Some(cache) = cache_opt {
cache
.insert(cache_key, (response.to_string(), meta.clone()), None)
.await;
cache.insert(cache_key, (response.to_string(), meta.clone()), None);
}
}
}
Expand Down

0 comments on commit bcf9070

Please sign in to comment.