Skip to content

Commit

Permalink
feat: integrate into binary
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp committed Nov 8, 2024
1 parent 654a3e1 commit 78203f9
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 23 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions apps/hash-graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ tokio = { workspace = true }
tokio-postgres = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
tracing = { workspace = true }
harpc-server.workspace = true
multiaddr.workspace = true
harpc-codec = { workspace = true, features = ["json"] }

[features]
test-server = ["dep:hash-graph-test-server"]
Expand Down
1 change: 1 addition & 0 deletions apps/hash-graph/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![forbid(unsafe_code)]
#![feature(async_closure)]
#![expect(
unreachable_pub,
reason = "This is a binary but as we want to document this crate as well this should be a \
Expand Down
152 changes: 132 additions & 20 deletions apps/hash-graph/src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ use error_stack::{Report, ResultExt as _};
use graph::{
ontology::domain_validator::DomainValidator,
store::{
DatabaseConnectionInfo, DatabasePoolConfig, FetchingPool, PostgresStorePool, StorePool as _,
DatabaseConnectionInfo, DatabasePoolConfig, FetchingPool, PostgresStorePool, StorePool,
},
};
use hash_graph_api::rest::{RestRouterDependencies, rest_api_router};
use harpc_codec::json::JsonCodec;
use harpc_server::Server;
use hash_graph_api::{
rest::{RestRouterDependencies, rest_api_router},
rpc::Dependencies,
};
use hash_graph_authorization::{
AuthorizationApi as _, NoAuthorization,
AuthorizationApi as _, AuthorizationApiPool, NoAuthorization,
backend::{SpiceDbOpenApi, ZanzibarBackend as _},
zanzibar::ZanzibarClient,
};
use hash_temporal_client::TemporalClientConfig;
use multiaddr::{Multiaddr, Protocol};
use regex::Regex;
use reqwest::{Client, Url};
use tokio::{net::TcpListener, time::timeout};
Expand Down Expand Up @@ -59,6 +65,38 @@ impl TryFrom<ApiAddress> for SocketAddr {
}
}

#[derive(Debug, Clone, Parser)]
pub struct RpcAddress {
/// The host the RPC client is listening at.
#[clap(long, default_value = "127.0.0.1", env = "HASH_GRAPH_RPC_HOST")]
pub rpc_host: String,

/// The port the RPC client is listening at.
#[clap(long, default_value_t = 25489, env = "HASH_GRAPH_RPC_PORT")]
pub rpc_port: u16,
}

impl fmt::Display for RpcAddress {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}:{}", self.rpc_host, self.rpc_port)
}
}

impl TryFrom<RpcAddress> for SocketAddr {
type Error = Report<AddrParseError>;

fn try_from(address: RpcAddress) -> Result<Self, Report<AddrParseError>> {
address
.to_string()
.parse::<Self>()
.attach_printable(address)
}
}

#[expect(
clippy::struct_excessive_bools,
reason = "CLI arguments are boolean flags."
)]
#[derive(Debug, Parser)]
pub struct ServerArgs {
#[clap(flatten)]
Expand All @@ -67,10 +105,18 @@ pub struct ServerArgs {
#[clap(flatten)]
pub pool_config: DatabasePoolConfig,

/// The address the REST client is listening at.
/// The address the REST server is listening at.
#[clap(flatten)]
pub api_address: ApiAddress,

/// Enable the experimental RPC server.
#[clap(long, default_value_t = false, env = "HASH_GRAPH_RPC_ENABLED")]
pub rpc_enabled: bool,

/// The address the RPC server is listening at.
#[clap(flatten)]
pub rpc_address: RpcAddress,

/// The address for the type fetcher RPC server is listening at.
#[clap(flatten)]
pub type_fetcher_address: TypeFetcherAddress,
Expand Down Expand Up @@ -134,6 +180,54 @@ pub struct ServerArgs {
pub temporal_port: u16,
}

fn server_rpc<S, A>(
address: RpcAddress,
dependencies: Dependencies<S, A, ()>,
) -> Result<(), Report<GraphError>>
where
S: StorePool + Send + Sync + 'static,
A: AuthorizationApiPool + Send + Sync + 'static,
{
let server = Server::new(harpc_server::ServerConfig::default()).change_context(GraphError)?;

let (router, task) = hash_graph_api::rpc::rpc_router(
Dependencies {
store: dependencies.store,
authorization_api: dependencies.authorization_api,
temporal_client: dependencies.temporal_client,
codec: JsonCodec,
},
server.events(),
);

tokio::spawn(task.into_future());

let socket_address: SocketAddr = SocketAddr::try_from(address).change_context(GraphError)?;
let mut address = Multiaddr::empty();
match socket_address {
SocketAddr::V4(v4) => {
address.push(Protocol::Ip4(*v4.ip()));
address.push(Protocol::Tcp(v4.port()));
}
SocketAddr::V6(v6) => {
address.push(Protocol::Ip6(*v6.ip()));
address.push(Protocol::Tcp(v6.port()));
}
}

#[expect(clippy::significant_drop_tightening, reason = "false positive")]
tokio::spawn(async move {
let stream = server
.listen(address)
.await
.expect("server should be able to listen on address");

harpc_server::serve::serve(stream, router).await;
});

Ok(())
}

pub async fn server(args: ServerArgs) -> Result<(), Report<GraphError>> {
if args.healthcheck {
return wait_healthcheck(
Expand Down Expand Up @@ -186,24 +280,42 @@ pub async fn server(args: ServerArgs) -> Result<(), Report<GraphError>> {
let mut zanzibar_client = ZanzibarClient::new(spicedb_client);
zanzibar_client.seed().await.change_context(GraphError)?;

let router = rest_api_router(RestRouterDependencies {
store: Arc::new(pool),
authorization_api: Arc::new(zanzibar_client),
domain_regex: DomainValidator::new(args.allowed_url_domain),
temporal_client: if let Some(host) = args.temporal_host {
Some(
TemporalClientConfig::new(
Url::from_str(&format!("{}:{}", host, args.temporal_port))
.change_context(GraphError)?,
)
.change_context(GraphError)?
.await
.change_context(GraphError)?,
let temporal_client_fn = async |host: Option<String>, port: u16| {
if let Some(host) = host {
TemporalClientConfig::new(
Url::from_str(&format!("{host}:{port}")).change_context(GraphError)?,
)
.change_context(GraphError)?
.await
.map(Some)
.change_context(GraphError)
} else {
None
},
});
Ok(None)
}
};

let router = {
let dependencies = RestRouterDependencies {
store: Arc::new(pool),
authorization_api: Arc::new(zanzibar_client),
domain_regex: DomainValidator::new(args.allowed_url_domain),
temporal_client: temporal_client_fn(args.temporal_host.clone(), args.temporal_port)
.await?,
};

if args.rpc_enabled {
tracing::info!("Starting RPC server...");

server_rpc(args.rpc_address, Dependencies {
store: Arc::clone(&dependencies.store),
authorization_api: Arc::clone(&dependencies.authorization_api),
temporal_client: temporal_client_fn(args.temporal_host, args.temporal_port).await?,
codec: (),
})?;
}

rest_api_router(dependencies)
};

tracing::info!("Listening on {}", args.api_address);
axum::serve(
Expand Down
5 changes: 4 additions & 1 deletion libs/@local/graph/api/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ pub struct Dependencies<S, A, C> {
pub fn rpc_router<S, A, C, N>(
dependencies: Dependencies<S, A, C>,
notifications: N,
) -> (Router<impl Route<RequestBody>>, Task<Account, N>)
) -> (
Router<impl Route<RequestBody, ResponseBody: Send, Future: Send> + Send>,
Task<Account, N>,
)
where
S: StorePool + Send + Sync + 'static,
A: AuthorizationApiPool + Send + Sync + 'static,
Expand Down
5 changes: 3 additions & 2 deletions libs/@local/harpc/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use core::{

use error_stack::{Report, ResultExt as _};
use futures::{Stream, StreamExt as _, stream::FusedStream};
pub use harpc_net::{session::server::SessionConfig, transport::TransportConfig};
use harpc_net::{
session::server::{EventStream, ListenStream, SessionConfig, SessionLayer, Transaction},
transport::{TransportConfig, TransportLayer},
session::server::{EventStream, ListenStream, SessionLayer, Transaction},
transport::TransportLayer,
};
use multiaddr::Multiaddr;
use tokio_util::sync::{CancellationToken, DropGuard};
Expand Down

0 comments on commit 78203f9

Please sign in to comment.