Skip to content

Commit

Permalink
Merge pull request #19 from arindas/develop
Browse files Browse the repository at this point in the history
chore: merge development updates
  • Loading branch information
arindas authored Jun 18, 2024
2 parents 72ec3c6 + 8abca1a commit 91af718
Showing 1 changed file with 91 additions and 9 deletions.
100 changes: 91 additions & 9 deletions src/object_storage/aws_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::{

pub const PART_SIZE_MAP_KEY_SUFFIX: &str = "_part_size_map.json";

pub const PART_EXTENSION: &str = "txt";

pub trait PartMap {
fn position_part_containing_offset(&self, offset: usize) -> Option<usize>;

Expand All @@ -43,6 +45,70 @@ pub trait PartMap {
.map(|p| p.end())
.unwrap_or(0)
}

fn clear(&mut self);

fn append_missing_parts_from(
&mut self,
aws_s3_client: &Client,
bucket: &String,
object_prefix: &String,
) -> impl std::future::Future<Output = Result<usize, AwsS3Error>> {
async move {
let mut parts_added = 0;

let part_suffix = format!(".{}", PART_EXTENSION);

for object in aws_s3_client
.list_objects_v2()
.bucket(bucket)
.prefix(object_prefix)
.send()
.await
.map_err(|err| AwsS3Error::AwsSdkError(err.to_string()))?
.contents()
{
let key = object.key().unwrap_or("");

let object_size = object.size().unwrap_or(0);

if !key.ends_with(&part_suffix) {
continue;
}

let start = key.find('_').ok_or(AwsS3Error::ParseError(
"Failed to find _ delim in part object key".to_string(),
))? + 1;
let end = key.find('.').ok_or(AwsS3Error::ParseError(
"Failed to find . delim in part object key".to_string(),
))?;

if start >= end || end > key.len() {
return Err(AwsS3Error::ParseError(
"Invalid part_idx parse slice[] bounds".to_string(),
));
}

let part_idx: &usize = &key[start..end]
.parse()
.map_err(|_| AwsS3Error::ParseError("Failed parsing part idx".to_string()))?;

if *part_idx < self.len() {
continue;
}

self.append_part_with_part_size(
object_size
.try_into()
.map_err(|_| AwsS3Error::IntegerConversionError)?,
);

parts_added += 1;
}

Ok(parts_added)
}
}
}

#[derive(Clone, Copy, Serialize, Deserialize)]
Expand Down Expand Up @@ -104,6 +170,10 @@ impl PartMap for Vec<Part> {

Some((idx, old_part_size, truncated_part))
}

fn clear(&mut self) {
self.clear()
}
}

#[derive(Clone, Copy, Serialize, Deserialize)]
Expand Down Expand Up @@ -158,6 +228,10 @@ impl PartMap for FixedPartSizeMap {
self.get_part_at_idx(self.len() - 1)
.map(|x| (self.len() - 1, self.part_size, x))
}

fn clear(&mut self) {
self.len = 0
}
}

#[allow(unused)]
Expand All @@ -184,8 +258,6 @@ where
}

pub enum AwsS3Error {
NonEmptyPartMapUsedForInit,

ByteStreamError(ByteStreamError),

IntegerConversionError,
Expand All @@ -194,6 +266,8 @@ pub enum AwsS3Error {

AwsSdkError(String),

ParseError(String),

PositionOutOfBounds,
}

Expand All @@ -205,12 +279,8 @@ where
aws_s3_client: Client,
bucket: String,
object_prefix: String,
empty_part_map: P,
mut fallback_part_map: P,
) -> Result<Self, AwsS3Error> {
if !empty_part_map.is_empty() {
return Err(AwsS3Error::NonEmptyPartMapUsedForInit);
}

let get_object_output = aws_s3_client
.get_object()
.bucket(&bucket)
Expand All @@ -229,7 +299,16 @@ where
.map_err(|err| AwsS3Error::AwsSdkError(err.to_string()))?
.into_bytes();

let part_size_map = serde_json::from_slice(&bytes).unwrap_or(empty_part_map);
let mut part_size_map = if let Ok(part_map) = serde_json::from_slice(&bytes) {
part_map
} else {
fallback_part_map.clear();
fallback_part_map
};

part_size_map
.append_missing_parts_from(&aws_s3_client, &bucket, &object_prefix)
.await?;

Ok(Self {
client: aws_s3_client,
Expand Down Expand Up @@ -350,7 +429,10 @@ where
self.client
.get_object()
.bucket(&self.bucket)
.key(format!("{}_{}.txt", &self.object_prefix, part_idx))
.key(format!(
"{}_{}.{}",
&self.object_prefix, part_idx, PART_EXTENSION
))
.range(format!("bytes={}-{}", range_start, range_end))
.send()
.map(|x| x.map_err(|err| err.to_string())),
Expand Down

0 comments on commit 91af718

Please sign in to comment.