Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add qbg-handler #2807

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/bin/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ edition = "2021"

[dependencies]
algorithm = { version = "0.1.0", path = "../../libs/algorithm" }
qbg = { version = "0.1.0", path = "../../libs/algorithms/qbg" }
anyhow = "1.0.88"
cargo = "0.81.0"
prost = "0.13.2"
Expand Down
15 changes: 12 additions & 3 deletions rust/bin/agent/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@
mod common;
pub mod index;
pub mod insert;
pub mod object;
pub mod remove;
pub mod search;
pub mod update;
pub mod upsert;
use std::sync::Arc;
use tokio::sync::RwLock;

pub struct Agent {
s: Box<dyn algorithm::ANN>,
s: Arc<RwLock<dyn algorithm::ANN>>,
name: String,
ip: String,
resource_type: String,
api_name: String,
}

impl Agent {
pub fn new(s: impl algorithm::ANN + 'static, name: &str, ip: &str, resource_type: &str, api_name: &str) -> Self {
pub fn new(
s: impl algorithm::ANN + 'static,
name: &str,
ip: &str,
resource_type: &str,
api_name: &str,
) -> Self {
Self {
s: Box::new(s),
s: Arc::new(RwLock::new(s)),
name: name.to_string(),
ip: ip.to_string(),
resource_type: resource_type.to_string(),
Expand Down
104 changes: 98 additions & 6 deletions rust/bin/agent/src/handler/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,109 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use algorithm::Error;
use proto::{
core::v1::agent_server,
payload::v1::{control, info, Empty},
vald::v1::index_server,
};
use std::collections::HashMap;
use tonic::{Code, Status};
use tonic_types::{ErrorDetails, PreconditionViolation, StatusExt};

#[tonic::async_trait]
impl agent_server::Agent for super::Agent {
async fn create_index(
&self,
_request: tonic::Request<control::CreateIndexRequest>,
request: tonic::Request<control::CreateIndexRequest>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
let req = request.get_ref();
let pool_size = req.pool_size;
let hostname = cargo::util::hostname()?;
let domain = hostname.to_str().unwrap();
let res = Empty {};
{
let mut s = self.s.write().await;
let result = s.create_index();
match result {
Err(err) => {
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.CreateIndex";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
let status = match err {
Error::UncommittedIndexNotFound {} => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_precondition_failure(vec![PreconditionViolation::new(
"uncommitted index is empty",
"failed to CreateIndex operation caused by empty uncommitted indices",
err.to_string(),
)]);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::FailedPrecondition,
format!("CreateIndex API failed to create indexes pool_size = {} due to the precondition failure, error: {}", pool_size, err.to_string()),
err_details,
)
}
Error::FlushingIsInProgress {} => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::Aborted,
"CreateIndex API aborted to process create indexes request due to flushing indices is in progress",
err_details,
)
}
_ => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::Internal,
format!("CreateIndex API failed to create indexes pool_size = {}, error: {}", pool_size, err.to_string()),
err_details,
)
}
};
Err(status)
}
Ok(()) => Ok(tonic::Response::new(res)),
}
}
}

async fn save_index(
&self,
_request: tonic::Request<Empty>,
request: tonic::Request<Empty>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
let hostname = cargo::util::hostname()?;
let domain = hostname.to_str().unwrap();
let res = Empty {};
{
let mut s = self.s.write().await;
let result = s.save_index();
match result {
Err(err) => {
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.SaveIndex";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_resource_info(resource_type, resource_name, "", "");
let status = Status::with_error_details(
Code::Internal,
"SaveIndex API failed to save indices",
err_details,
);
Err(status)
}
Ok(()) => Ok(tonic::Response::new(res)),
}
}
}

#[doc = " Represent the creating and saving index RPC.\n"]
Expand All @@ -49,9 +132,18 @@ impl index_server::Index for super::Agent {
#[doc = " Represent the RPC to get the agent index information.\n"]
async fn index_info(
&self,
_request: tonic::Request<Empty>,
request: tonic::Request<Empty>,
) -> std::result::Result<tonic::Response<info::index::Count>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
{
let s = self.s.read().await;
Ok(tonic::Response::new(info::index::Count {
stored: s.len(),
uncommitted: s.insert_vqueue_buffer_len() + s.delete_vqueue_buffer_len(),
indexing: s.is_indexing(),
saving: s.is_saving(),
}))
}
}

#[doc = " Represent the RPC to get the agent index detailed information.\n"]
Expand Down
130 changes: 128 additions & 2 deletions rust/bin/agent/src/handler/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,143 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use algorithm::Error;
use prost::Message;
use proto::{
payload::v1::{insert, object},
vald::v1::insert_server,
};
use std::{collections::HashMap, string::String};
use tonic::{Code, Status};
use tonic_types::{ErrorDetails, FieldViolation, StatusExt};

#[tonic::async_trait]
impl insert_server::Insert for super::Agent {
async fn insert(
&self,
_request: tonic::Request<insert::Request>,
request: tonic::Request<insert::Request>,
) -> std::result::Result<tonic::Response<object::Location>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
let req = request.get_ref();
let config = match req.config.clone() {
Some(cfg) => cfg,
None => return Err(Status::invalid_argument("Missing configuration in request")),
};
let hostname = cargo::util::hostname()?;
let domain = hostname.to_str().unwrap();
{
let mut s = self.s.write().await;
let vec = match req.vector.clone() {
Some(v) => v,
None => return Err(Status::invalid_argument("Missing vector in request")),
};
if vec.vector.len() != s.get_dimension_size() {
let err = Error::IncompatibleDimensionSize {
got: vec.vector.len(),
want: s.get_dimension_size(),
};
let mut err_details = ErrorDetails::new();
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.Insert";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id,
String::from_utf8(req.encode_to_vec())
.unwrap_or_else(|_| "<invalid UTF-8>".to_string()),
);
err_details.set_bad_request(vec![FieldViolation::new(
"vector dimension size",
err.to_string(),
)]);
err_details.set_resource_info(resource_type, resource_name, "", "");
let status = Status::with_error_details(
Code::InvalidArgument,
"Insert API Incombatible Dimension Size detedted",
err_details,
);
return Err(status);
}
let result = s.insert(vec.id.clone(), vec.vector.clone(), config.timestamp);
match result {
Err(err) => {
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.Insert";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
let status = match err {
Error::FlushingIsInProgress {} => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id,
String::from_utf8(req.encode_to_vec())
.unwrap_or_else(|_| "<invalid UTF-8>".to_string()),
);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(Code::Aborted, "Insert API aborted to process insert request due to flushing indices is in progress", err_details)
}
Error::UUIDAlreadyExists { uuid: _ } => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id.clone(),
String::from_utf8(req.encode_to_vec())
.unwrap_or_else(|_| "<invalid UTF-8>".to_string()),
);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::AlreadyExists,
format!("Insert API uuid {} already exists", vec.id),
err_details,
)
}
Error::UUIDNotFound { id: _ } => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id.clone(),
String::from_utf8(req.encode_to_vec())
.unwrap_or_else(|_| "<invalid UTF-8>".to_string()),
);
err_details.set_bad_request(vec![FieldViolation::new(
"uuid",
err.to_string(),
)]);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::InvalidArgument,
format!(
"Insert API invalid id: \"{}\" or vector: {:?} was given",
vec.id, vec.vector
),
err_details,
)
}
_ => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id,
String::from_utf8(req.encode_to_vec())
.unwrap_or_else(|_| "<invalid UTF-8>".to_string()),
);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::Unknown,
"failed to parse Insert gRPC error response",
err_details,
)
}
};
Err(status)
}
Ok(()) => Ok(tonic::Response::new(object::Location {
name: self.name.clone(),
uuid: vec.id,
ips: vec![self.ip.clone()],
})),
}
}
}

#[doc = " Server streaming response type for the StreamInsert method."]
Expand Down
Loading
Loading