Skip to content

Commit

Permalink
WiP: rpc bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Nov 20, 2024
1 parent 7475a7d commit 7b2d167
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 13 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion core/model/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,34 @@ impl RemoteEndpoint for NetDst {

#[cfg(test)]
mod tests {
use ya_client_model::NodeId;
use super::*;

#[test]
fn ok_try_service_on_public() {
"0xbabe000000000000000000000000000000000000"
let ep = "0xbabe000000000000000000000000000000000000"
.try_service("/public/x")
.unwrap();
eprintln!("addr={}", ep.addr());

let n1 : NodeId ="0xbabe000000000000000000000000000000000000".parse().unwrap();
let n2 : NodeId ="0xcafe000000000000000000000000000000000000".parse().unwrap();

let ep = super::from(n1)
.to(n2)
.service("/public/x");
eprintln!("service from: {n1} to {n2} addr={}", ep.addr());

let ep = super::from(n1)
.to(n2)
.service_udp("/public/x");
eprintln!("service_udp from: {n1} to {n2} addr={}", ep.addr());

let ep = super::from(n1)
.to(n2)
.service_transfer("/public/x");
eprintln!("service_transfer from: {n1} to {n2} addr={}", ep.addr());

}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion core/net-iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ serde = { version = "1.0.214", features = ["derive"] }
ya-core-model = { workspace = true, features = ["net", "identity"] }
bytes = { version = "1" }
ethsign = { version = "0.8" }

url.workspace = true


[lints]
Expand Down
27 changes: 25 additions & 2 deletions core/net-iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::future::LocalBoxFuture;
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::rc::Rc;
use url::Url;
use ya_core_model::NodeId;

///
Expand All @@ -21,13 +22,35 @@ pub struct NetClientBuilder {
_inner: (),
}

impl NetClientBuilder {}
impl NetClientBuilder {

pub fn bind_url(self, _url : Url) -> Self {
self
}

pub fn crypto_provider(self, _cypher : impl CryptoProvider) -> Self {
self
}

pub async fn start(self) -> Result<NetClient> {
todo!()
}

}

#[derive(Clone)]
pub struct NetClient {
_inner: (),
}

impl NetClient {

pub fn builder() -> NetClientBuilder {
NetClientBuilder {
_inner: (),
}
}

pub async fn send_msg(&self, from: NodeId, to: NodeId, msg: Bytes) -> Result<()> {

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / System Tests (ubuntu-latest)

unused variable: `from`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / System Tests (ubuntu-latest)

unused variable: `to`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / System Tests (ubuntu-latest)

unused variable: `msg`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (ubuntu-latest)

unused variable: `from`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (ubuntu-latest)

unused variable: `to`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (ubuntu-latest)

unused variable: `msg`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (macos-latest)

unused variable: `from`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (macos-latest)

unused variable: `to`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (macos-latest)

unused variable: `msg`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build binaries (aarch64)

unused variable: `from`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build binaries (aarch64)

unused variable: `to`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build binaries (aarch64)

unused variable: `msg`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused variable: `from`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused variable: `to`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused variable: `msg`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (windows-latest)

unused variable: `from`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (windows-latest)

unused variable: `to`

Check failure on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Market Test Suite (windows-latest)

unused variable: `msg`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (macos-latest)

unused variable: `from`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (macos-latest)

unused variable: `to`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (macos-latest)

unused variable: `msg`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

unused variable: `from`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

unused variable: `to`

Check warning on line 54 in core/net-iroh/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

unused variable: `msg`
todo!()
}
Expand Down Expand Up @@ -58,7 +81,7 @@ impl NetClient {
}

pub async fn recv_broadcast(&self, topic: String) -> impl Stream<Item = (NodeId, Bytes)> {
todo!()
futures::stream::empty()
}

pub async fn status(&self) -> Result<NetStatus> {
Expand Down
6 changes: 5 additions & 1 deletion core/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default = []
service = []
# Temporary to make goth integration tests work
central-net = []
iroh-net = []
packet-trace-enable = [
"ya-packet-trace/enable",
"ya-relay-client/packet-trace-enable",
Expand All @@ -25,8 +26,10 @@ ya-sb-util = { workspace = true }
ya-service-api.workspace = true
ya-service-api-interfaces.workspace = true
ya-service-bus = { workspace = true, features = ["tls"] }
ya-utils-networking.workspace = true
ya-utils-networking = { workspace = true, features = ["dns"] }
ya-packet-trace = { git = "https://github.com/golemfactory/ya-packet-trace" }
net-iroh = { path = "../net-iroh" }
postcard = { version = "1.0.10", features = ["alloc"]}

actix.workspace = true
actix-web.workspace = true
Expand All @@ -51,6 +54,7 @@ url = { version = "2.2" }
prost = { version = "0.10" }
rand = { version = "0.7" }
regex = { workspace = true }
serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
ya-sb-proto = { workspace = true }
Expand Down
7 changes: 2 additions & 5 deletions core/net/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ use url::Url;
)]
#[strum(serialize_all = "kebab-case")]
pub enum NetType {
/// TODO: Remove compilation flag.
/// This conditional compilation is hack to make Goth integration tests work.
/// Current solution in Goth is to build separate binary with compilation flag.
/// This is only temporary for transition period, to make this PR as small as possible.
#[cfg_attr(feature = "central-net", default)]
Central,
#[cfg_attr(not(feature = "central-net"), default)]
Hybrid,
#[cfg_attr(not(feature = "central-net"), default)]
IROH,
}

#[derive(StructOpt, Clone)]
Expand Down
93 changes: 93 additions & 0 deletions core/net/src/iroh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use ya_core_model::net::local::{BindBroadcastError, BroadcastMessage, SendBroadcastMessage, ToEndpoint};
use std::sync::Arc;
use actix_web::Scope;
use futures::{future, stream};
use ya_service_bus::{Error, RpcEndpoint, RpcMessage};
use crate::Config;
use net_iroh::NetClient;
use ya_service_bus::untyped::{Fn4HandlerExt, Fn4StreamHandlerExt};
use ya_service_bus::typed as bus;
use ya_core_model::net;

mod local_service;
mod cli;
mod bridge;
mod crypto;
mod rpc;


pub struct IRohNet;

pub async fn gsb<Context>(_: Context, config: Config) -> anyhow::Result<()> {
use ya_service_bus::{untyped as gsb, error::Error as GsbError, ResponseChunk};

let (default_id, ids) = crate::service::identities().await?;
let crypto = self::crypto::IdentityCryptoProvider::new(default_id);
let client = NetClient::builder().bind_url(config.bind_url).crypto_provider(crypto).start().await?;

// /net/{id}/{service} -> /public/{service}
// /transfer/net/{dst}/service
// /udp/net/{dst}/service

// /from/{src}/to/{dst}/{service} -> /public/{service}
// /udp/from/{src}/to/{dst}/{service} -> /public/{service}
// /transfer/from/{src}/to/{dst}/{service} -> /public/{service}


// -> Future<Result<Vec<u8>, GsbError>
let rpc = move |caller: &str, addr: &str, msg: &[u8], no_reply: bool| {

future::ok(Vec::new())
};

let rpc_stream = move |caller: &str, addr: &str, msg: &[u8], no_reply: bool| {

stream::once(future::ok(ResponseChunk::Full(Vec::new())))
};

let _ = gsb::subscribe("/net", rpc.into_handler(), rpc_stream.into_stream_handler());

Ok(())
}

pub fn scope() -> Scope {
todo!()
}


pub async fn bind_broadcast_with_caller<M, T, F>(
broadcast_address: &str,
handler: F,
) -> Result<(), BindBroadcastError>
where
M: BroadcastMessage + Send + Sync + 'static,
T: std::future::Future<
Output = Result<
<SendBroadcastMessage<M> as RpcMessage>::Item,
<SendBroadcastMessage<M> as RpcMessage>::Error,
>,
> + 'static,
F: FnMut(String, SendBroadcastMessage<M>) -> T + 'static,
{
let address = broadcast_address.to_string();
let subscription = M::into_subscribe_msg(address.clone());

log::debug!(
"Binding broadcast handler for topic: {}",
subscription.topic()
);

bus::service(net::local::BUS_ID)
.send(subscription)
.await??;

log::debug!(
"Binding handler '{broadcast_address}' for broadcast topic {}.",
M::TOPIC
);

// We created endpoint address above. Now we must add handler, which will
// handle broadcasts forwarded to this address.
bus::bind_with_caller(broadcast_address, handler);
Ok(())
}
Empty file added core/net/src/iroh/bridge.rs
Empty file.
Empty file added core/net/src/iroh/cli.rs
Empty file.
Loading

0 comments on commit 7b2d167

Please sign in to comment.