Skip to content

Commit

Permalink
Split registry implementations (#119)
Browse files Browse the repository at this point in the history
* Split registry implementations

* Move path and artifact_key to own function

* Add initialize function to each registry kind

* Fix format and lint errors
  • Loading branch information
715209 authored Jan 13, 2025
1 parent 4df899f commit 55637af
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 434 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tonic::transport::{Channel, Server};
use tracing::{info, warn, Level};
use tracing_subscriber::fmt::writer::MakeWriterExt;
use tracing_subscriber::FmtSubscriber;
use vorpal_registry::{RegistryServer, RegistryServerBackend};
use vorpal_registry::{RegistryBackend, RegistryServer, RegistryServerBackend};
use vorpal_schema::{
get_artifact_system,
vorpal::{
Expand Down Expand Up @@ -555,10 +555,21 @@ async fn main() -> Result<()> {
bail!("s3 backend requires '--registry-backend-s3-bucket' parameter");
}

let service = RegistryServiceServer::new(RegistryServer::new(
backend,
registry_backend_s3_bucket.clone(),
));
let backend: Box<dyn RegistryBackend> = match backend {
RegistryServerBackend::Local => {
Box::new(vorpal_registry::LocalRegistryBackend::new()?)
}
RegistryServerBackend::S3 => Box::new(
vorpal_registry::S3RegistryBackend::new(registry_backend_s3_bucket.clone())
.await?,
),
RegistryServerBackend::GHA => {
Box::new(vorpal_registry::GhaRegistryBackend::new()?)
}
RegistryServerBackend::Unknown => unreachable!(),
};

let service = RegistryServiceServer::new(RegistryServer::new(backend));

info!("registry service: [::]:{}", port);

Expand Down
1 change: 1 addition & 0 deletions registry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rsa = { default-features = false, version = "0" }
serde = { default-features = false, features = ["derive"], version = "1" }
serde_json = { default-features = false, features = ["std"], version = "1" }
sha2 = { default-features = false, version = "0" }
thiserror = { default-features = false, version = "2" }
tokio = { default-features = false, features = ["process", "rt-multi-thread"], version = "1" }
tokio-stream = { default-features = false, features = ["io-util"], version = "0" }
tonic = { default-features = false, version = "0" }
Expand Down
171 changes: 170 additions & 1 deletion registry/src/gha.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use std::path::Path;

use anyhow::{anyhow, Context, Result};
use reqwest::{
header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_RANGE, CONTENT_TYPE},
Client, StatusCode,
};
use serde::{Deserialize, Serialize};
use tokio::{
fs::{read, write},
sync::mpsc,
};
use tonic::{async_trait, Status};
use tracing::info;
use vorpal_schema::vorpal::registry::v0::{RegistryKind, RegistryPullResponse, RegistryRequest};

use crate::{PushMetadata, RegistryBackend, RegistryError, DEFAULT_GRPC_CHUNK_SIZE};

const API_VERSION: &str = "6.0-preview.1";
const DEFAULT_GHA_CHUNK_SIZE: usize = 32 * 1024 * 1024; // 32MB

#[derive(Debug, Serialize, Deserialize)]
pub struct ArtifactCacheEntry {
Expand Down Expand Up @@ -38,7 +49,7 @@ pub struct CommitCacheRequest {
pub size: u64,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CacheClient {
base_url: String,
client: Client,
Expand Down Expand Up @@ -175,3 +186,161 @@ impl CacheClient {
Ok(())
}
}

fn get_cache_key(name: &str, hash: &str, kind: RegistryKind) -> Result<String> {
let prefix = "vorpal-registry";
let affix = format!("{}-{}", name, hash);

match kind {
RegistryKind::Artifact => Ok(format!("{}-{}-artifact", prefix, affix)),
RegistryKind::ArtifactSource => Ok(format!("{}-{}-source", prefix, affix)),
_ => Err(anyhow::anyhow!("unsupported store kind")),
}
}

#[derive(Debug, Clone)]
pub struct GhaRegistryBackend {
cache_client: CacheClient,
}

impl GhaRegistryBackend {
pub fn new() -> Result<Self, RegistryError> {
let cache_client = CacheClient::new()
.map_err(|err| RegistryError::FailedToCreateGhaClient(err.to_string()))?;

Ok(Self { cache_client })
}
}

#[async_trait]
impl RegistryBackend for GhaRegistryBackend {
async fn exists(&self, request: &RegistryRequest) -> Result<(), Status> {
let cache_key = get_cache_key(&request.name, &request.hash, request.kind())
.expect("failed to get cache key");
let cache_key_file = format!("/tmp/{}", cache_key);
let cache_key_file_path = Path::new(&cache_key_file);

if cache_key_file_path.exists() {
return Ok(());
}

info!("get cache entry -> {}", cache_key);

let cache_entry = &self
.cache_client
.get_cache_entry(&cache_key, &request.hash)
.await
.map_err(|e| {
Status::internal(format!("failed to get cache entry: {:?}", e.to_string()))
})?;

info!("get cache entry response -> {:?}", cache_entry);

if cache_entry.is_none() {
return Err(Status::not_found("store path not found"));
}

Ok(())
}

async fn pull(
&self,
request: &RegistryRequest,
tx: mpsc::Sender<Result<RegistryPullResponse, Status>>,
) -> Result<(), Status> {
let cache_key = get_cache_key(&request.name, &request.hash, request.kind())
.expect("failed to get cache key");
let cache_key_file = format!("/tmp/{}", cache_key);
let cache_key_file_path = Path::new(&cache_key_file);

if cache_key_file_path.exists() {
let data = read(&cache_key_file_path)
.await
.map_err(|err| Status::internal(err.to_string()))?;

for chunk in data.chunks(DEFAULT_GRPC_CHUNK_SIZE) {
tx.send(Ok(RegistryPullResponse {
data: chunk.to_vec(),
}))
.await
.map_err(|err| {
Status::internal(format!("failed to send store chunk: {:?}", err))
})?;
}

return Ok(());
}

let cache_entry = &self
.cache_client
.get_cache_entry(&cache_key, &request.hash)
.await
.expect("failed to get cache entry");

let Some(cache_entry) = cache_entry else {
return Err(Status::not_found("store path not found"));
};

info!(
"cache entry archive location -> {:?}",
cache_entry.archive_location
);

let response = reqwest::get(&cache_entry.archive_location)
.await
.expect("failed to get");

let response_bytes = response.bytes().await.expect("failed to read response");

for chunk in response_bytes.chunks(DEFAULT_GRPC_CHUNK_SIZE) {
tx.send(Ok(RegistryPullResponse {
data: chunk.to_vec(),
}))
.await
.map_err(|err| Status::internal(format!("failed to send store chunk: {:?}", err)))?;
}

write(&cache_key_file_path, &response_bytes)
.await
.map_err(|err| Status::internal(format!("failed to write store path: {:?}", err)))?;

Ok(())
}

async fn push(&self, metadata: PushMetadata) -> Result<(), Status> {
let PushMetadata {
data_kind,
hash,
name,
data,
} = metadata;

let cache_key = get_cache_key(&name, &hash, data_kind)
.map_err(|err| Status::internal(format!("failed to get cache key: {:?}", err)))?;

let cache_size = data.len() as u64;

let cache_reserve = &self
.cache_client
.reserve_cache(cache_key, hash.clone(), Some(cache_size))
.await
.map_err(|e| {
Status::internal(format!("failed to reserve cache: {:?}", e.to_string()))
})?;

if cache_reserve.cache_id == 0 {
return Err(Status::internal("failed to reserve cache returned 0"));
}

self.cache_client
.save_cache(cache_reserve.cache_id, &data, DEFAULT_GHA_CHUNK_SIZE)
.await
.map_err(|e| Status::internal(format!("failed to save cache: {:?}", e.to_string())))?;

Ok(())
}

fn box_clone(&self) -> Box<dyn RegistryBackend> {
Box::new(self.clone())
}
}
Loading

0 comments on commit 55637af

Please sign in to comment.