Skip to content

Commit

Permalink
async call for initial proposal
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Nov 14, 2023
1 parent d0065e3 commit 257dc51
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license = "GPL-3.0"
edition = "2018"

[features]
default = ['erc20-driver', 'zksync-driver', 'gftp/bin']
default = ['erc20-driver', 'gftp/bin']
static-openssl = ["openssl/vendored", "openssl-probe"]
dummy-driver = ['ya-dummy-driver']
erc20-driver = ['ya-erc20-driver']
Expand Down Expand Up @@ -224,7 +224,7 @@ trust-dns-resolver = "0.22"
libsqlite3-sys = { version = "0.26.0", features = ["bundled"] }
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }

ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "3e89c4b7c6ad06a4fc08cf6fdfe22f4740759af5" }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "0e406c78f9e1ba5f156f8d157025b93617804bef" }


[patch.crates-io]
Expand Down
1 change: 1 addition & 0 deletions core/market/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ya-service-api-web = "0.2"
ya-service-bus = "0.6.1"
ya-std-utils = "0.1"
ya-utils-actix = "0.2"
parking_lot = "0.12.1"

actix = { version = "0.13", default-features = false }
actix-http = "3"
Expand Down
71 changes: 63 additions & 8 deletions core/market/src/protocol/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Discovery protocol interface
use futures::TryFutureExt;
use metrics::{counter, timing, value};
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -19,6 +20,7 @@ use super::callback::HandlerSlot;
use crate::config::DiscoveryConfig;
use crate::db::model::{Offer as ModelOffer, SubscriptionId};
use crate::identity::{IdentityApi, IdentityError};
use parking_lot::Mutex as PlMutex;

pub mod builder;
pub mod error;
Expand Down Expand Up @@ -53,13 +55,58 @@ pub struct DiscoveryImpl {
lazy_binder_prefix: Mutex<Option<String>>,

/// Receiving queue.
offers_receiving_queue: mpsc::Sender<(String, OffersBcast)>,
offers_receiving_queue: mpsc::Sender<(NodeId, OffersBcast)>,
offer_handlers: OfferHandlers,

config: DiscoveryConfig,
/// We need this to determine, if we use hybrid NET. Should be removed together
/// with central NET implementation in future.
net_type: net::NetType,
ban_cache: BanCache,
}

struct BanCache {
inner: Arc<PlMutex<BanCacheInner>>,
}

struct BanCacheInner {
banned_nodes: HashSet<NodeId>,
ts: Instant,
}

const MAX_BAN_TIME: std::time::Duration = std::time::Duration::from_secs(300);
impl BanCache {
fn new() -> Self {
let banned_nodes = Default::default();
let ts = Instant::now();

Self {
inner: Arc::new(PlMutex::new(BanCacheInner { banned_nodes, ts })),
}
}

fn is_banned_node(&self, node_id: &NodeId) -> bool {
let mut g = self.inner.lock();
if g.banned_nodes.contains(node_id) {
if g.ts.elapsed() > MAX_BAN_TIME {
g.banned_nodes.clear();
g.ts = Instant::now();
false
} else {
true
}
} else {
false
}
}

fn ban_node(&self, node_id: NodeId) {
let mut g = self.inner.lock();
if g.banned_nodes.is_empty() {
g.ts = Instant::now();
}
g.banned_nodes.insert(node_id);
}
}

impl Discovery {
Expand Down Expand Up @@ -314,15 +361,19 @@ impl Discovery {
Ok(())
}

async fn bcast_receiver_loop(self, mut offers_channel: mpsc::Receiver<(String, OffersBcast)>) {
async fn bcast_receiver_loop(self, mut offers_channel: mpsc::Receiver<(NodeId, OffersBcast)>) {
while let Some((caller, msg)) = offers_channel.recv().await {
self.bcast_receiver_loop_step(caller, msg).await.ok();
if !self.inner.ban_cache.is_banned_node(&caller) {
self.bcast_receiver_loop_step(caller, msg).await.ok();
} else {
log::trace!("banned node: {caller}");
}
}

log::debug!("Broadcast receiver loop stopped.");
}

async fn bcast_receiver_loop_step(&self, caller: String, msg: OffersBcast) -> Result<(), ()> {
async fn bcast_receiver_loop_step(&self, caller: NodeId, msg: OffersBcast) -> Result<(), ()> {
let start = Instant::now();
let num_ids_received = msg.offer_ids.len();

Expand All @@ -335,14 +386,17 @@ impl Discovery {
let filter_out_known_ids = self.inner.offer_handlers.filter_out_known_ids.clone();
let receive_remote_offers = self.inner.offer_handlers.receive_remote_offers.clone();

let unknown_offer_ids = filter_out_known_ids.call(caller.clone(), msg).await?;
let unknown_offer_ids = filter_out_known_ids.call(caller.to_string(), msg).await?;

let new_offer_ids = if !unknown_offer_ids.is_empty() {
let start_remote = Instant::now();
let offers = self
.get_remote_offers(caller.clone(), unknown_offer_ids, 3)
.get_remote_offers(caller.to_string(), unknown_offer_ids, 3)
.await
.map_err(|e| log::debug!("Can't get Offers from [{caller}]. Error: {e}"))?;
.map_err(|e| {
self.inner.ban_cache.ban_node(caller);
log::debug!("Can't get Offers from [{caller}]. Error: {e}")
})?;

let end_remote = Instant::now();
timing!(
Expand All @@ -354,7 +408,7 @@ impl Discovery {
// We still could fail to add some Offers to database. If we fail to add them, we don't
// want to propagate subscription further.
receive_remote_offers
.call(caller.clone(), OffersRetrieved { offers })
.call(caller.to_string(), OffersRetrieved { offers })
.await?
} else {
vec![]
Expand Down Expand Up @@ -384,6 +438,7 @@ impl Discovery {
return Ok(());
}

let caller: NodeId = caller.parse().map_err(|_| ())?;
// We don't want to get overwhelmed by incoming broadcasts, that's why we drop them,
// if the queue is full.
match self.inner.offers_receiving_queue.try_send((caller, msg)) {
Expand Down
2 changes: 2 additions & 0 deletions core/market/src/protocol/discovery/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ya_net::{self as net};
use super::{Discovery, DiscoveryImpl};
use crate::config::DiscoveryConfig;
use crate::protocol::discovery::OfferHandlers;
use crate::testing::discovery::BanCache;

#[derive(Default)]
pub struct DiscoveryBuilder {
Expand Down Expand Up @@ -86,6 +87,7 @@ impl DiscoveryBuilder {
config: self.config.unwrap(),
net_type: net::Config::from_env().unwrap().net_type,
offers_receiving_queue: sender,
ban_cache: BanCache::new(),
}),
};

Expand Down
23 changes: 17 additions & 6 deletions core/market/src/protocol/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::protocol::negotiation::error::{
};
use crate::protocol::negotiation::messages::AgreementCommitted;
use chrono::NaiveDateTime;
use tokio::task::spawn_local;

/// Responsible for communication with markets on other nodes
/// during negotiation phase.
Expand Down Expand Up @@ -79,12 +80,22 @@ impl NegotiationApi {
offer_id: proposal.negotiation.offer_id,
demand_id: proposal.negotiation.demand_id,
};
net::from(proposal.negotiation.requestor_id)
.to(proposal.negotiation.provider_id)
.service(&provider::proposal_addr(BUS_ID))
.send(msg)
.await
.map_err(|e| GsbProposalError(e.to_string(), proposal_id))??;
let provider_id = proposal.negotiation.provider_id;
spawn_local(
net::from(proposal.negotiation.requestor_id)
.to(proposal.negotiation.provider_id)
.service(&provider::proposal_addr(BUS_ID))
.send(msg)
.map_err(move |e| {
log::warn!("failed to send initial proposal to [{provider_id}]: {e:?}");
e
}),
);
/*
.await
.map_err(|e| GsbProposalError(e.to_string(), proposal_id))??;
*/
Ok(())
}

Expand Down

0 comments on commit 257dc51

Please sign in to comment.