From 55637afcf49ff0d9eb36db37235fdf0a445bde12 Mon Sep 17 00:00:00 2001 From: Brian Date: Mon, 13 Jan 2025 02:05:21 +0100 Subject: [PATCH] Split registry implementations (#119) * Split registry implementations * Move path and artifact_key to own function * Add initialize function to each registry kind * Fix format and lint errors --- Cargo.lock | 1 + cli/src/main.rs | 21 +- registry/Cargo.toml | 1 + registry/src/gha.rs | 171 ++++++++++++++- registry/src/lib.rs | 498 ++++++------------------------------------ registry/src/local.rs | 98 +++++++++ registry/src/s3.rs | 137 ++++++++++++ 7 files changed, 493 insertions(+), 434 deletions(-) create mode 100644 registry/src/local.rs create mode 100644 registry/src/s3.rs diff --git a/Cargo.lock b/Cargo.lock index caa70276..a5dfebdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3216,6 +3216,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "thiserror 2.0.9", "tokio", "tokio-stream", "tonic", diff --git a/cli/src/main.rs b/cli/src/main.rs index 9256a5e0..5f41127e 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -18,7 +18,7 @@ use tonic::transport::{Channel, Server}; use tracing::{info, warn, Level}; use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::FmtSubscriber; -use vorpal_registry::{RegistryServer, RegistryServerBackend}; +use vorpal_registry::{RegistryBackend, RegistryServer, RegistryServerBackend}; use vorpal_schema::{ get_artifact_system, vorpal::{ @@ -555,10 +555,21 @@ async fn main() -> Result<()> { bail!("s3 backend requires '--registry-backend-s3-bucket' parameter"); } - let service = RegistryServiceServer::new(RegistryServer::new( - backend, - registry_backend_s3_bucket.clone(), - )); + let backend: Box = match backend { + RegistryServerBackend::Local => { + Box::new(vorpal_registry::LocalRegistryBackend::new()?) + } + RegistryServerBackend::S3 => Box::new( + vorpal_registry::S3RegistryBackend::new(registry_backend_s3_bucket.clone()) + .await?, + ), + RegistryServerBackend::GHA => { + Box::new(vorpal_registry::GhaRegistryBackend::new()?) + } + RegistryServerBackend::Unknown => unreachable!(), + }; + + let service = RegistryServiceServer::new(RegistryServer::new(backend)); info!("registry service: [::]:{}", port); diff --git a/registry/Cargo.toml b/registry/Cargo.toml index 90dd5aec..1bbe01ca 100644 --- a/registry/Cargo.toml +++ b/registry/Cargo.toml @@ -12,6 +12,7 @@ rsa = { default-features = false, version = "0" } serde = { default-features = false, features = ["derive"], version = "1" } serde_json = { default-features = false, features = ["std"], version = "1" } sha2 = { default-features = false, version = "0" } +thiserror = { default-features = false, version = "2" } tokio = { default-features = false, features = ["process", "rt-multi-thread"], version = "1" } tokio-stream = { default-features = false, features = ["io-util"], version = "0" } tonic = { default-features = false, version = "0" } diff --git a/registry/src/gha.rs b/registry/src/gha.rs index 1c23e93c..c348942f 100644 --- a/registry/src/gha.rs +++ b/registry/src/gha.rs @@ -1,12 +1,23 @@ +use std::path::Path; + use anyhow::{anyhow, Context, Result}; use reqwest::{ header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_RANGE, CONTENT_TYPE}, Client, StatusCode, }; use serde::{Deserialize, Serialize}; +use tokio::{ + fs::{read, write}, + sync::mpsc, +}; +use tonic::{async_trait, Status}; use tracing::info; +use vorpal_schema::vorpal::registry::v0::{RegistryKind, RegistryPullResponse, RegistryRequest}; + +use crate::{PushMetadata, RegistryBackend, RegistryError, DEFAULT_GRPC_CHUNK_SIZE}; const API_VERSION: &str = "6.0-preview.1"; +const DEFAULT_GHA_CHUNK_SIZE: usize = 32 * 1024 * 1024; // 32MB #[derive(Debug, Serialize, Deserialize)] pub struct ArtifactCacheEntry { @@ -38,7 +49,7 @@ pub struct CommitCacheRequest { pub size: u64, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CacheClient { base_url: String, client: Client, @@ -175,3 +186,161 @@ impl CacheClient { Ok(()) } } + +fn get_cache_key(name: &str, hash: &str, kind: RegistryKind) -> Result { + let prefix = "vorpal-registry"; + let affix = format!("{}-{}", name, hash); + + match kind { + RegistryKind::Artifact => Ok(format!("{}-{}-artifact", prefix, affix)), + RegistryKind::ArtifactSource => Ok(format!("{}-{}-source", prefix, affix)), + _ => Err(anyhow::anyhow!("unsupported store kind")), + } +} + +#[derive(Debug, Clone)] +pub struct GhaRegistryBackend { + cache_client: CacheClient, +} + +impl GhaRegistryBackend { + pub fn new() -> Result { + let cache_client = CacheClient::new() + .map_err(|err| RegistryError::FailedToCreateGhaClient(err.to_string()))?; + + Ok(Self { cache_client }) + } +} + +#[async_trait] +impl RegistryBackend for GhaRegistryBackend { + async fn exists(&self, request: &RegistryRequest) -> Result<(), Status> { + let cache_key = get_cache_key(&request.name, &request.hash, request.kind()) + .expect("failed to get cache key"); + let cache_key_file = format!("/tmp/{}", cache_key); + let cache_key_file_path = Path::new(&cache_key_file); + + if cache_key_file_path.exists() { + return Ok(()); + } + + info!("get cache entry -> {}", cache_key); + + let cache_entry = &self + .cache_client + .get_cache_entry(&cache_key, &request.hash) + .await + .map_err(|e| { + Status::internal(format!("failed to get cache entry: {:?}", e.to_string())) + })?; + + info!("get cache entry response -> {:?}", cache_entry); + + if cache_entry.is_none() { + return Err(Status::not_found("store path not found")); + } + + Ok(()) + } + + async fn pull( + &self, + request: &RegistryRequest, + tx: mpsc::Sender>, + ) -> Result<(), Status> { + let cache_key = get_cache_key(&request.name, &request.hash, request.kind()) + .expect("failed to get cache key"); + let cache_key_file = format!("/tmp/{}", cache_key); + let cache_key_file_path = Path::new(&cache_key_file); + + if cache_key_file_path.exists() { + let data = read(&cache_key_file_path) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + for chunk in data.chunks(DEFAULT_GRPC_CHUNK_SIZE) { + tx.send(Ok(RegistryPullResponse { + data: chunk.to_vec(), + })) + .await + .map_err(|err| { + Status::internal(format!("failed to send store chunk: {:?}", err)) + })?; + } + + return Ok(()); + } + + let cache_entry = &self + .cache_client + .get_cache_entry(&cache_key, &request.hash) + .await + .expect("failed to get cache entry"); + + let Some(cache_entry) = cache_entry else { + return Err(Status::not_found("store path not found")); + }; + + info!( + "cache entry archive location -> {:?}", + cache_entry.archive_location + ); + + let response = reqwest::get(&cache_entry.archive_location) + .await + .expect("failed to get"); + + let response_bytes = response.bytes().await.expect("failed to read response"); + + for chunk in response_bytes.chunks(DEFAULT_GRPC_CHUNK_SIZE) { + tx.send(Ok(RegistryPullResponse { + data: chunk.to_vec(), + })) + .await + .map_err(|err| Status::internal(format!("failed to send store chunk: {:?}", err)))?; + } + + write(&cache_key_file_path, &response_bytes) + .await + .map_err(|err| Status::internal(format!("failed to write store path: {:?}", err)))?; + + Ok(()) + } + + async fn push(&self, metadata: PushMetadata) -> Result<(), Status> { + let PushMetadata { + data_kind, + hash, + name, + data, + } = metadata; + + let cache_key = get_cache_key(&name, &hash, data_kind) + .map_err(|err| Status::internal(format!("failed to get cache key: {:?}", err)))?; + + let cache_size = data.len() as u64; + + let cache_reserve = &self + .cache_client + .reserve_cache(cache_key, hash.clone(), Some(cache_size)) + .await + .map_err(|e| { + Status::internal(format!("failed to reserve cache: {:?}", e.to_string())) + })?; + + if cache_reserve.cache_id == 0 { + return Err(Status::internal("failed to reserve cache returned 0")); + } + + self.cache_client + .save_cache(cache_reserve.cache_id, &data, DEFAULT_GHA_CHUNK_SIZE) + .await + .map_err(|e| Status::internal(format!("failed to save cache: {:?}", e.to_string())))?; + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} diff --git a/registry/src/lib.rs b/registry/src/lib.rs index 730ff2e9..cf9f94ee 100644 --- a/registry/src/lib.rs +++ b/registry/src/lib.rs @@ -1,35 +1,46 @@ use anyhow::Result; -use aws_sdk_s3::Client; use rsa::{ pss::{Signature, VerifyingKey}, sha2::Sha256, signature::Verifier, }; -use std::path::Path; -use tokio::{ - fs::{read, write}, - sync::mpsc, -}; +use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; -use tracing::{error, info}; +use tracing::error; use vorpal_notary::get_public_key; use vorpal_schema::vorpal::registry::v0::{ registry_service_server::{RegistryService, RegistryServiceServer}, - RegistryKind, - RegistryKind::{Artifact, ArtifactSource, UnknownStoreKind}, + RegistryKind::{self, UnknownStoreKind}, RegistryPullResponse, RegistryPushRequest, RegistryRequest, RegistryResponse, }; -use vorpal_store::paths::{ - get_artifact_archive_path, get_public_key_path, get_source_archive_path, get_store_dir_name, - set_timestamps, -}; - -mod gha; +use vorpal_store::paths::get_public_key_path; + +pub mod gha; +pub mod local; +pub mod s3; +pub use gha::GhaRegistryBackend; +pub use local::LocalRegistryBackend; +pub use s3::S3RegistryBackend; + +#[derive(thiserror::Error, Debug)] +pub enum RegistryError { + #[error("missing s3 bucket")] + MissingS3Bucket, + + #[error("failed to create GHA cache client: {0}")] + FailedToCreateGhaClient(String), +} -const DEFAULT_GHA_CHUNK_SIZE: usize = 32 * 1024 * 1024; // 32MB const DEFAULT_GRPC_CHUNK_SIZE: usize = 2 * 1024 * 1024; // 2MB +pub struct PushMetadata { + data_kind: RegistryKind, + hash: String, + name: String, + data: Vec, +} + #[derive(Clone, Debug, Default, PartialEq)] pub enum RegistryServerBackend { #[default] @@ -39,29 +50,33 @@ pub enum RegistryServerBackend { S3, } -#[derive(Debug, Default)] -pub struct RegistryServer { - pub backend: RegistryServerBackend, - pub backend_s3_bucket: Option, +#[tonic::async_trait] +pub trait RegistryBackend: Send + Sync + 'static { + async fn exists(&self, request: &RegistryRequest) -> Result<(), Status>; + async fn pull( + &self, + request: &RegistryRequest, + tx: mpsc::Sender>, + ) -> Result<(), Status>; + async fn push(&self, metadata: PushMetadata) -> Result<(), Status>; + + /// Return a new `Box` cloned from `self`. + fn box_clone(&self) -> Box; } -impl RegistryServer { - pub fn new(backend: RegistryServerBackend, backend_s3_bucket: Option) -> Self { - Self { - backend, - backend_s3_bucket, - } +impl Clone for Box { + fn clone(&self) -> Self { + self.box_clone() } } -fn get_cache_key(name: &str, hash: &str, kind: RegistryKind) -> Result { - let prefix = "vorpal-registry"; - let affix = format!("{}-{}", name, hash); +pub struct RegistryServer { + pub backend: Box, +} - match kind { - Artifact => Ok(format!("{}-{}-artifact", prefix, affix)), - ArtifactSource => Ok(format!("{}-{}-source", prefix, affix)), - _ => Err(anyhow::anyhow!("unsupported store kind")), +impl RegistryServer { + pub fn new(backend: Box) -> Self { + Self { backend } } } @@ -83,83 +98,7 @@ impl RegistryService for RegistryServer { return Err(Status::invalid_argument("missing store name")); } - let backend = self.backend.clone(); - - if backend == RegistryServerBackend::GHA { - let cache_key = get_cache_key(&request.name, &request.hash, request.kind()) - .expect("failed to get cache key"); - let cache_key_file = format!("/tmp/{}", cache_key); - let cache_key_file_path = Path::new(&cache_key_file); - - if cache_key_file_path.exists() { - return Ok(Response::new(RegistryResponse { success: true })); - } - - let cache_client = gha::CacheClient::new().map_err(|err| { - Status::internal(format!("failed to create GHA cache client: {:?}", err)) - })?; - - info!("get cache entry -> {}", cache_key); - - let cache_entry = cache_client - .get_cache_entry(&cache_key, &request.hash) - .await - .map_err(|e| { - Status::internal(format!("failed to get cache entry: {:?}", e.to_string())) - })?; - - info!("get cache entry response -> {:?}", cache_entry); - - if cache_entry.is_none() { - return Err(Status::not_found("store path not found")); - } - - return Ok(Response::new(RegistryResponse { success: true })); - } - - if backend == RegistryServerBackend::Local { - let path = match request.kind() { - Artifact => get_artifact_archive_path(&request.hash, &request.name), - ArtifactSource => get_source_archive_path(&request.hash, &request.name), - _ => return Err(Status::invalid_argument("unsupported store kind")), - }; - - if !path.exists() { - return Err(Status::not_found("store path not found")); - } - } - - if backend == RegistryServerBackend::S3 && self.backend_s3_bucket.is_none() { - return Err(Status::invalid_argument("missing s3 bucket")); - } - - if backend == RegistryServerBackend::S3 { - let artifact_key = match request.kind() { - Artifact => format!( - "store/{}.artifact", - get_store_dir_name(&request.hash, &request.name) - ), - ArtifactSource => format!( - "store/{}.source", - get_store_dir_name(&request.hash, &request.name) - ), - _ => return Err(Status::invalid_argument("unsupported store kind")), - }; - - let client_config = aws_config::load_from_env().await; - let client = Client::new(&client_config); - - let head_result = client - .head_object() - .bucket(self.backend_s3_bucket.clone().unwrap()) - .key(&artifact_key) - .send() - .await; - - if head_result.is_err() { - return Err(Status::not_found("store path not found")); - } - } + self.backend.exists(&request).await?; Ok(Response::new(RegistryResponse { success: true })) } @@ -171,13 +110,6 @@ impl RegistryService for RegistryServer { let (tx, rx) = mpsc::channel(100); let backend = self.backend.clone(); - let backend_s3_bucket = self.backend_s3_bucket.clone(); - - if backend == RegistryServerBackend::S3 && backend_s3_bucket.is_none() { - return Err(Status::invalid_argument("missing s3 bucket")); - } - - let client_bucket_name = backend_s3_bucket.unwrap_or_default(); tokio::spawn(async move { let request = request.into_inner(); @@ -193,224 +125,9 @@ impl RegistryService for RegistryServer { return; } - if backend == RegistryServerBackend::GHA { - let cache_key = get_cache_key(&request.name, &request.hash, request.kind()) - .expect("failed to get cache key"); - let cache_key_file = format!("/tmp/{}", cache_key); - let cache_key_file_path = Path::new(&cache_key_file); - - if cache_key_file_path.exists() { - let data = match read(&cache_key_file_path).await { - Ok(data) => data, - Err(err) => { - if let Err(err) = tx.send(Err(Status::internal(err.to_string()))).await - { - error!("failed to send store error: {:?}", err); - } - - return; - } - }; - - for chunk in data.chunks(DEFAULT_GRPC_CHUNK_SIZE) { - if let Err(err) = tx - .send(Ok(RegistryPullResponse { - data: chunk.to_vec(), - })) - .await - { - error!("failed to send store chunk: {:?}", err); - - break; - } - } - - return; - } - - let cache_client = - gha::CacheClient::new().expect("failed to create GHA cache client"); - - let cache_entry = cache_client - .get_cache_entry(&cache_key, &request.hash) - .await - .expect("failed to get cache entry"); - - if cache_entry.is_none() { - if let Err(err) = tx - .send(Err(Status::not_found("store path not found"))) - .await - { - error!("failed to send store error: {:?}", err); - } - - return; - } - - let cache_entry = cache_entry.unwrap(); - - info!( - "cache entry archive location -> {:?}", - cache_entry.archive_location - ); - - let response = reqwest::get(&cache_entry.archive_location) - .await - .expect("failed to get"); - - let response_bytes = response.bytes().await.expect("failed to read response"); - - for chunk in response_bytes.chunks(DEFAULT_GRPC_CHUNK_SIZE) { - if let Err(err) = tx - .send(Ok(RegistryPullResponse { - data: chunk.to_vec(), - })) - .await - { - error!("failed to send store chunk: {:?}", err); - - break; - } - } - - let _ = write(&cache_key_file_path, &response_bytes).await; - } - - if backend == RegistryServerBackend::Local { - let path = match request.kind() { - Artifact => get_artifact_archive_path(&request.hash, &request.name), - ArtifactSource => get_source_archive_path(&request.hash, &request.name), - _ => { - if let Err(err) = tx - .send(Err(Status::invalid_argument("unsupported store kind"))) - .await - { - error!("failed to send store error: {:?}", err); - } - - return; - } - }; - - if !path.exists() { - if let Err(err) = tx - .send(Err(Status::not_found("store path not found"))) - .await - { - error!("failed to send store error: {:?}", err); - } - - return; - } - - let data = match read(&path).await { - Ok(data) => data, - Err(err) => { - if let Err(err) = tx.send(Err(Status::internal(err.to_string()))).await { - error!("failed to send store error: {:?}", err); - } - - return; - } - }; - - for chunk in data.chunks(DEFAULT_GRPC_CHUNK_SIZE) { - if let Err(err) = tx - .send(Ok(RegistryPullResponse { - data: chunk.to_vec(), - })) - .await - { - error!("failed to send store chunk: {:?}", err); - - break; - } - } - } - - if backend == RegistryServerBackend::S3 { - let artifact_key = match request.kind() { - Artifact => format!( - "store/{}.artifact", - get_store_dir_name(&request.hash, &request.name) - ), - - ArtifactSource => { - format!( - "store/{}.source", - get_store_dir_name(&request.hash, &request.name) - ) - } - - _ => { - if let Err(err) = tx - .send(Err(Status::invalid_argument("unsupported store kind"))) - .await - { - error!("failed to send store error: {:?}", err); - } - - return; - } - }; - - let client_config = aws_config::load_from_env().await; - let client = Client::new(&client_config); - - let _ = match client - .head_object() - .bucket(client_bucket_name.clone()) - .key(artifact_key.clone()) - .send() - .await - { - Ok(_) => {} - Err(err) => { - if let Err(err) = tx.send(Err(Status::not_found(err.to_string()))).await { - error!("failed to send store error: {:?}", err); - } - - return; - } - }; - - let mut stream = match client - .get_object() - .bucket(client_bucket_name) - .key(artifact_key) - .send() - .await - { - Ok(output) => output.body, - Err(err) => { - if let Err(err) = tx.send(Err(Status::internal(err.to_string()))).await { - error!("failed to send store error: {:?}", err); - } - - return; - } - }; - - while let Some(chunk_result) = stream.next().await { - match chunk_result { - Ok(chunk) => { - if let Err(err) = tx - .send(Ok(RegistryPullResponse { - data: chunk.to_vec(), - })) - .await - { - error!("failed to send store chunk: {:?}", err.to_string()); - - break; - } - } - Err(err) => { - let _ = tx.send(Err(Status::internal(err.to_string()))).await; - - break; - } - } + if let Err(err) = backend.pull(&request, tx.clone()).await { + if let Err(err) = tx.send(Err(err)).await { + error!("failed to send store error: {:?}", err); } } }); @@ -422,12 +139,6 @@ impl RegistryService for RegistryServer { &self, request: Request>, ) -> Result, Status> { - let backend = self.backend.clone(); - - if backend == RegistryServerBackend::S3 && self.backend_s3_bucket.is_none() { - return Err(Status::invalid_argument("missing `s3` bucket argument")); - } - let mut data: Vec = vec![]; let mut data_hash = None; let mut data_kind = UnknownStoreKind; @@ -450,9 +161,13 @@ impl RegistryService for RegistryServer { return Err(Status::invalid_argument("missing `data` field")); } - if data_hash.is_none() { + let Some(data_hash) = data_hash else { return Err(Status::invalid_argument("missing `hash` field")); - } + }; + + let Some(data_name) = data_name else { + return Err(Status::invalid_argument("missing `name` field")); + }; if data_kind == UnknownStoreKind { return Err(Status::invalid_argument("missing `kind` field")); @@ -480,91 +195,17 @@ impl RegistryService for RegistryServer { ))); } - let backend = self.backend.clone(); - let hash = data_hash.unwrap(); - let name = data_name.unwrap(); - - if backend == RegistryServerBackend::GHA { - let cache_client = gha::CacheClient::new().map_err(|err| { - Status::internal(format!("failed to create GHA cache client: {:?}", err)) - })?; - - let cache_key = get_cache_key(&name, &hash, data_kind) - .map_err(|err| Status::internal(format!("failed to get cache key: {:?}", err)))?; - - let cache_size = data.len() as u64; - - let cache_reserve = cache_client - .reserve_cache(cache_key, hash.clone(), Some(cache_size)) - .await - .map_err(|e| { - Status::internal(format!("failed to reserve cache: {:?}", e.to_string())) - })?; - - if cache_reserve.cache_id == 0 { - return Err(Status::internal("failed to reserve cache returned 0")); - } + let hash = data_hash; + let name = data_name; - let _ = cache_client - .save_cache(cache_reserve.cache_id, &data, DEFAULT_GHA_CHUNK_SIZE) - .await - .map_err(|e| { - Status::internal(format!("failed to save cache: {:?}", e.to_string())) - })?; - } - - if backend == RegistryServerBackend::Local { - let path = match data_kind { - Artifact => get_artifact_archive_path(&hash, &name), - ArtifactSource => get_source_archive_path(&hash, &name), - _ => return Err(Status::invalid_argument("unsupported store kind")), - }; - - if path.exists() { - return Ok(Response::new(RegistryResponse { success: true })); - } - - write(&path, &data).await.map_err(|err| { - Status::internal(format!("failed to write store path: {:?}", err)) - })?; - - set_timestamps(&path) - .await - .map_err(|err| Status::internal(format!("failed to sanitize path: {:?}", err)))?; - } - - if backend == RegistryServerBackend::S3 { - let artifact_key = match data_kind { - Artifact => format!("store/{}.artifact", get_store_dir_name(&hash, &name)), - ArtifactSource => format!("store/{}.source", get_store_dir_name(&hash, &name)), - _ => return Err(Status::invalid_argument("unsupported store kind")), - }; - - let client_config = aws_config::load_from_env().await; - let client = Client::new(&client_config); - - let head_result = client - .head_object() - .bucket(self.backend_s3_bucket.clone().unwrap()) - .key(&artifact_key) - .send() - .await; - - if head_result.is_ok() { - return Ok(Response::new(RegistryResponse { success: true })); - } - - let _ = client - .put_object() - .bucket(self.backend_s3_bucket.clone().unwrap()) - .key(artifact_key) - .body(data.into()) - .send() - .await - .map_err(|err| { - Status::internal(format!("failed to write store path: {:?}", err)) - })?; - } + self.backend + .push(PushMetadata { + data_kind, + hash, + name, + data, + }) + .await?; Ok(Response::new(RegistryResponse { success: true })) } @@ -583,7 +224,8 @@ pub async fn listen(port: u16) -> Result<()> { .parse() .map_err(|err| anyhow::anyhow!("failed to parse address: {:?}", err))?; - let registry_service = RegistryServiceServer::new(RegistryServer::default()); + let registry_service = + RegistryServiceServer::new(RegistryServer::new(Box::new(LocalRegistryBackend))); Server::builder() .add_service(registry_service) diff --git a/registry/src/local.rs b/registry/src/local.rs new file mode 100644 index 00000000..c0fad5ad --- /dev/null +++ b/registry/src/local.rs @@ -0,0 +1,98 @@ +use tokio::{ + fs::{read, write}, + sync::mpsc, +}; +use tonic::{async_trait, Status}; +use vorpal_schema::vorpal::registry::v0::{RegistryKind, RegistryPullResponse, RegistryRequest}; +use vorpal_store::paths::{get_artifact_archive_path, get_source_archive_path, set_timestamps}; + +use crate::{PushMetadata, RegistryBackend, RegistryError, DEFAULT_GRPC_CHUNK_SIZE}; + +#[derive(Clone, Debug)] +pub struct LocalRegistryBackend; + +impl LocalRegistryBackend { + pub fn new() -> Result { + Ok(Self) + } +} + +fn get_registry_path( + kind: RegistryKind, + hash: &str, + name: &str, +) -> Result { + match kind { + RegistryKind::Artifact => Ok(get_artifact_archive_path(hash, name)), + RegistryKind::ArtifactSource => Ok(get_source_archive_path(hash, name)), + _ => Err(Status::invalid_argument("unsupported store kind")), + } +} + +#[async_trait] +impl RegistryBackend for LocalRegistryBackend { + async fn exists(&self, request: &RegistryRequest) -> Result<(), Status> { + let path = get_registry_path(request.kind(), &request.hash, &request.name)?; + + if !path.exists() { + return Err(Status::not_found("store path not found")); + } + + Ok(()) + } + + async fn pull( + &self, + request: &RegistryRequest, + tx: mpsc::Sender>, + ) -> Result<(), Status> { + let path = get_registry_path(request.kind(), &request.hash, &request.name)?; + + if !path.exists() { + return Err(Status::not_found("store path not found")); + } + + let data = read(&path) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + for chunk in data.chunks(DEFAULT_GRPC_CHUNK_SIZE) { + tx.send(Ok(RegistryPullResponse { + data: chunk.to_vec(), + })) + .await + .map_err(|err| Status::internal(format!("failed to send store chunk: {:?}", err)))?; + } + + Ok(()) + } + + async fn push(&self, metadata: PushMetadata) -> Result<(), Status> { + let PushMetadata { + data_kind, + hash, + name, + data, + } = metadata; + + let path = get_registry_path(data_kind, &hash, &name)?; + + if path.exists() { + return Ok(()); + } + + write(&path, &data) + .await + .map_err(|err| Status::internal(format!("failed to write store path: {:?}", err)))?; + + set_timestamps(&path) + .await + .map_err(|err| Status::internal(format!("failed to sanitize path: {:?}", err)))?; + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} diff --git a/registry/src/s3.rs b/registry/src/s3.rs new file mode 100644 index 00000000..28a0fdc7 --- /dev/null +++ b/registry/src/s3.rs @@ -0,0 +1,137 @@ +use aws_sdk_s3::Client; +use tokio::sync::mpsc; +use tonic::{async_trait, Status}; +use vorpal_schema::vorpal::registry::v0::{RegistryKind, RegistryPullResponse, RegistryRequest}; +use vorpal_store::paths::get_store_dir_name; + +use crate::{PushMetadata, RegistryBackend, RegistryError}; + +#[derive(Clone, Debug)] +pub struct S3RegistryBackend { + bucket: String, + client: Client, +} + +impl S3RegistryBackend { + pub async fn new(bucket: Option) -> Result { + let Some(bucket) = bucket else { + return Err(RegistryError::MissingS3Bucket); + }; + + let client_config = aws_config::load_from_env().await; + let client = Client::new(&client_config); + + Ok(Self { bucket, client }) + } +} + +fn artifact_key(kind: RegistryKind, hash: &str, name: &str) -> Result { + match kind { + RegistryKind::Artifact => Ok(format!("store/{}.artifact", get_store_dir_name(hash, name))), + RegistryKind::ArtifactSource => { + Ok(format!("store/{}.source", get_store_dir_name(hash, name))) + } + _ => Err(Status::invalid_argument("unsupported store kind")), + } +} + +#[async_trait] +impl RegistryBackend for S3RegistryBackend { + async fn exists(&self, request: &RegistryRequest) -> Result<(), Status> { + let artifact_key = artifact_key(request.kind(), &request.hash, &request.name)?; + + let head_result = &self + .client + .head_object() + .bucket(&self.bucket) + .key(&artifact_key) + .send() + .await; + + if head_result.is_err() { + return Err(Status::not_found("store path not found")); + } + + Ok(()) + } + + async fn pull( + &self, + request: &RegistryRequest, + tx: mpsc::Sender>, + ) -> Result<(), Status> { + let artifact_key = artifact_key(request.kind(), &request.hash, &request.name)?; + + let client = &self.client; + let client_bucket_name = &self.bucket; + + client + .head_object() + .bucket(client_bucket_name.clone()) + .key(artifact_key.clone()) + .send() + .await + .map_err(|err| Status::not_found(err.to_string()))?; + + let mut stream = client + .get_object() + .bucket(client_bucket_name) + .key(artifact_key) + .send() + .await + .map_err(|err| Status::internal(err.to_string()))? + .body; + + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result.map_err(|err| Status::internal(err.to_string()))?; + + tx.send(Ok(RegistryPullResponse { + data: chunk.to_vec(), + })) + .await + .map_err(|err| Status::internal(format!("failed to send store chunk: {:?}", err)))?; + } + + Ok(()) + } + + async fn push(&self, metadata: PushMetadata) -> Result<(), Status> { + let PushMetadata { + data_kind, + hash, + name, + data, + } = metadata; + + let artifact_key = artifact_key(data_kind, &hash, &name)?; + + let client = &self.client; + let bucket = &self.bucket; + + let head_result = client + .head_object() + .bucket(bucket) + .key(&artifact_key) + .send() + .await; + + if head_result.is_ok() { + return Ok(()); + } + + let _ = client + .put_object() + .bucket(bucket) + .key(artifact_key) + .body(data.into()) + .send() + .await + .map_err(|err| Status::internal(format!("failed to write store path: {:?}", err)))?; + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +}