Skip to content

Commit

Permalink
kad: Provide partial results to speedup GetRecord queries (#315)
Browse files Browse the repository at this point in the history
This PR provides the partial results of the `GetRecord` kademlia query.
- A new `GetRecordPartialResult` event is introduced for kademlia
- `GetRecordSuccess` is modified to include only the query ID
- Kademlia `GetRecord` implementation no longer stores network records
internally and forwards valid (unexpired) ones back to the user


The change is needed to speedup authority discovery for substrate based
chains.


More context can be found at:
paritytech/polkadot-sdk#7077 (comment)

### Next Steps
- [x] Adjust testing to API breaking change
- [x] Investigate CPU impact (as suggested by @dmitry-markin this should
be unnoticeable 🙏 )

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Jan 14, 2025
1 parent 121300d commit c6a0746
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 153 deletions.
24 changes: 10 additions & 14 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,17 @@ pub enum KademliaEvent {
GetRecordSuccess {
/// Query ID.
query_id: QueryId,
},

/// `GET_VALUE` inflight query produced a result.
///
/// This event is emitted when a peer responds to the query with a record.
GetRecordPartialResult {
/// Query ID.
query_id: QueryId,

/// Found records.
records: RecordsType,
/// Found record.
record: PeerRecord,
},

/// `GET_PROVIDERS` query succeeded.
Expand Down Expand Up @@ -251,18 +259,6 @@ pub enum KademliaEvent {
},
}

/// The type of the DHT records.
#[derive(Debug, Clone)]
pub enum RecordsType {
/// Record was found in the local store and [`Quorum::One`] was used.
///
/// This contains only a single result.
LocalStore(Record),

/// Records found in the network. This can include the locally found record.
Network(Vec<PeerRecord>),
}

/// Handle for communicating with the Kademlia protocol.
pub struct KademliaHandle {
/// TX channel for sending commands to `Kademlia`.
Expand Down
70 changes: 54 additions & 16 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use std::{
time::{Duration, Instant},
};

pub use self::handle::RecordsType;
pub use config::{Config, ConfigBuilder};
pub use handle::{
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
Expand Down Expand Up @@ -485,6 +484,7 @@ impl Kademlia {

// update routing table and inform user about the update
self.update_routing_table(&peers).await;

self.engine.register_response(
query_id,
peer,
Expand Down Expand Up @@ -837,14 +837,8 @@ impl Kademlia {

Ok(())
}
QueryAction::GetRecordQueryDone { query_id, records } => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::Network(records),
})
.await;
QueryAction::GetRecordQueryDone { query_id } => {
let _ = self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id }).await;
Ok(())
}
QueryAction::GetProvidersQueryDone {
Expand All @@ -868,7 +862,14 @@ impl Kademlia {
let _ = self.event_tx.send(KademliaEvent::QueryFailed { query_id: query }).await;
Ok(())
}
QueryAction::QuerySucceeded { .. } => unreachable!(),
QueryAction::GetRecordPartialResult { query_id, record } => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordPartialResult { query_id, record })
.await;
Ok(())
}
QueryAction::QuerySucceeded { .. } => Ok(()),
}
}

Expand Down Expand Up @@ -1101,23 +1102,41 @@ impl Kademlia {

match (self.store.get(&key), quorum) {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordPartialResult { query_id, record: PeerRecord {
peer: self.service.local_peer_id(),
record: record.clone(),
} })
.await;

let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::LocalStore(record.clone()),
})
.await;
}
(record, _) => {
let local_record = record.is_some();
if let Some(record) = record {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordPartialResult { query_id, record: PeerRecord {
peer: self.service.local_peer_id(),
record: record.clone(),
} })
.await;
}

self.engine.start_get_record(
query_id,
key.clone(),
self.routing_table
.closest(&Key::new(key), self.replication_factor)
.into(),
quorum,
record.cloned(),
local_record,
);
}
}
Expand Down Expand Up @@ -1292,8 +1311,16 @@ mod tests {
},
];

for record in records {
let action = QueryAction::GetRecordPartialResult {
query_id: QueryId(1),
record,
};
assert!(kademlia.on_query_action(action).await.is_ok());
}

let query_id = QueryId(1);
let action = QueryAction::GetRecordQueryDone { query_id, records };
let action = QueryAction::GetRecordQueryDone { query_id };
assert!(kademlia.on_query_action(action).await.is_ok());

// Check the local storage should not get updated.
Expand Down Expand Up @@ -1332,9 +1359,20 @@ mod tests {
},
];

let query_id = QueryId(1);
let action = QueryAction::GetRecordQueryDone { query_id, records };
assert!(kademlia.on_query_action(action).await.is_ok());
for record in records {
let action = QueryAction::GetRecordPartialResult {
query_id: QueryId(1),
record,
};
assert!(kademlia.on_query_action(action).await.is_ok());
}

kademlia
.on_query_action(QueryAction::GetRecordQueryDone {
query_id: QueryId(1),
})
.await
.unwrap();

// Check the local storage should not get updated.
assert!(kademlia.store.get(&key).is_none());
Expand Down
89 changes: 69 additions & 20 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,19 @@ pub struct GetRecordContext {
/// Candidates.
pub candidates: BTreeMap<Distance, KademliaPeer>,

/// Found records.
pub found_records: Vec<PeerRecord>,
/// Number of found records.
pub found_records: usize,

/// Records to propagate as next query action.
pub records: VecDeque<PeerRecord>,
}

impl GetRecordContext {
/// Create new [`GetRecordContext`].
pub fn new(
config: GetRecordConfig,
in_peers: VecDeque<KademliaPeer>,
found_records: Vec<PeerRecord>,
local_record: bool,
) -> Self {
let mut candidates = BTreeMap::new();

Expand All @@ -127,15 +130,11 @@ impl GetRecordContext {
candidates,
pending: HashMap::new(),
queried: HashSet::new(),
found_records,
found_records: if local_record { 1 } else { 0 },
records: VecDeque::new(),
}
}

/// Get the found records.
pub fn found_records(self) -> Vec<PeerRecord> {
self.found_records
}

/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, peer: PeerId) {
let Some(peer) = self.pending.remove(&peer) else {
Expand All @@ -152,6 +151,8 @@ impl GetRecordContext {
}

/// Register `GET_VALUE` response from `peer`.
///
/// Returns some if the response should be propagated to the user.
pub fn register_response(
&mut self,
peer: PeerId,
Expand All @@ -177,10 +178,12 @@ impl GetRecordContext {

if let Some(record) = record {
if !record.is_expired(std::time::Instant::now()) {
self.found_records.push(PeerRecord {
self.records.push_back(PeerRecord {
peer: peer.peer,
record,
});

self.found_records += 1;
}
}

Expand Down Expand Up @@ -258,9 +261,17 @@ impl GetRecordContext {

/// Get next action for a `GET_VALUE` query.
pub fn next_action(&mut self) -> Option<QueryAction> {
// Drain the records first.
if let Some(record) = self.records.pop_front() {
return Some(QueryAction::GetRecordPartialResult {
query_id: self.config.query,
record,
});
}

// These are the records we knew about before starting the query and
// the records we found along the way.
let known_records = self.config.known_records + self.found_records.len();
let known_records = self.config.known_records + self.found_records;

// If we cannot make progress, return the final result.
// A query failed when we are not able to identify one single record.
Expand All @@ -277,7 +288,7 @@ impl GetRecordContext {
}

// Check if enough records have been found
let sufficient_records = self.config.sufficient_records(self.found_records.len());
let sufficient_records = self.config.sufficient_records(self.found_records);
if sufficient_records {
return Some(QueryAction::QuerySucceeded {
query: self.config.query,
Expand Down Expand Up @@ -382,7 +393,7 @@ mod tests {
#[test]
fn completes_when_no_candidates() {
let config = default_config();
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
let mut context = GetRecordContext::new(config, VecDeque::new(), false);
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
Expand All @@ -391,7 +402,7 @@ mod tests {
known_records: 1,
..default_config()
};
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
let mut context = GetRecordContext::new(config, VecDeque::new(), false);
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
Expand All @@ -409,7 +420,7 @@ mod tests {
assert_eq!(in_peers_set.len(), 3);

let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers, Vec::new());
let mut context = GetRecordContext::new(config, in_peers, false);

for num in 0..3 {
let event = context.next_action().unwrap();
Expand Down Expand Up @@ -448,7 +459,7 @@ mod tests {
assert_eq!(in_peers_set.len(), 3);

let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers, Vec::new());
let mut context = GetRecordContext::new(config, in_peers, false);

// Schedule peer queries.
for num in 0..3 {
Expand All @@ -473,26 +484,53 @@ mod tests {
assert_eq!(context.pending.len(), 3);
assert!(context.queried.is_empty());

let mut found_records = Vec::new();
// Provide responses back.
let record = Record::new(key.clone(), vec![1, 2, 3]);
context.register_response(peer_a, Some(record), vec![]);
// Check propagated action.
let record = context.next_action().unwrap();
match record {
QueryAction::GetRecordPartialResult { query_id, record } => {
assert_eq!(query_id, QueryId(0));
assert_eq!(record.peer, peer_a);
assert_eq!(record.record, Record::new(key.clone(), vec![1, 2, 3]));

found_records.push(record);
}
_ => panic!("Unexpected event"),
}

assert_eq!(context.pending.len(), 2);
assert_eq!(context.queried.len(), 1);
assert_eq!(context.found_records.len(), 1);
assert_eq!(context.found_records, 1);

// Provide different response from peer b with peer d as candidate.
let record = Record::new(key.clone(), vec![4, 5, 6]);
context.register_response(peer_b, Some(record), vec![peer_to_kad(peer_d.clone())]);
// Check propagated action.
let record = context.next_action().unwrap();
match record {
QueryAction::GetRecordPartialResult { query_id, record } => {
assert_eq!(query_id, QueryId(0));
assert_eq!(record.peer, peer_b);
assert_eq!(record.record, Record::new(key.clone(), vec![4, 5, 6]));

found_records.push(record);
}
_ => panic!("Unexpected event"),
}

assert_eq!(context.pending.len(), 1);
assert_eq!(context.queried.len(), 2);
assert_eq!(context.found_records.len(), 2);
assert_eq!(context.found_records, 2);
assert_eq!(context.candidates.len(), 1);

// Peer C fails.
context.register_response_failure(peer_c);
assert!(context.pending.is_empty());
assert_eq!(context.queried.len(), 3);
assert_eq!(context.found_records.len(), 2);
assert_eq!(context.found_records, 2);

// Drain the last candidate.
let event = context.next_action().unwrap();
Expand All @@ -509,13 +547,24 @@ mod tests {
// Peer D responds.
let record = Record::new(key.clone(), vec![4, 5, 6]);
context.register_response(peer_d, Some(record), vec![]);
// Check propagated action.
let record = context.next_action().unwrap();
match record {
QueryAction::GetRecordPartialResult { query_id, record } => {
assert_eq!(query_id, QueryId(0));
assert_eq!(record.peer, peer_d);
assert_eq!(record.record, Record::new(key.clone(), vec![4, 5, 6]));

found_records.push(record);
}
_ => panic!("Unexpected event"),
}

// Produces the result.
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });

// Check results.
let found_records = context.found_records();
assert_eq!(
found_records,
vec![
Expand Down
Loading

0 comments on commit c6a0746

Please sign in to comment.