Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update light client optimistic update re-processing logic. #7

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion beacon_node/beacon_processor/src/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ pub enum ReprocessQueueMessage {
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock),
/// A block that was successfully processed. We use this to handle attestations and light client updates
/// A block that was successfully processed. We use this to handle attestations updates
/// for unknown blocks.
BlockImported {
block_root: Hash256,
parent_root: Hash256,
},
/// A new `LightClientOptimisticUpdate` has been produced. We use this to handle light client
/// updates for unknown parent blocks.
NewLightClientOptimisticUpdate { parent_root: Hash256 },
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate),
/// An aggregated attestation that references an unknown block.
Expand Down Expand Up @@ -688,6 +691,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
.awaiting_lc_updates_per_parent_root
Expand Down
11 changes: 8 additions & 3 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ where
}
.spawn_manager(
beacon_processor_channels.beacon_processor_rx,
beacon_processor_channels.work_reprocessing_tx,
beacon_processor_channels.work_reprocessing_tx.clone(),
beacon_processor_channels.work_reprocessing_rx,
None,
beacon_chain.slot_clock.clone(),
Expand Down Expand Up @@ -857,8 +857,13 @@ where
let log = broadcast_context.log().clone();
broadcast_context.executor.spawn(
async move {
compute_light_client_updates(&inner_chain, light_client_server_rv, &log)
.await
compute_light_client_updates(
&inner_chain,
light_client_server_rv,
beacon_processor_channels.work_reprocessing_tx,
&log,
)
.await
},
"lcserv_broadcast",
);
Expand Down
12 changes: 11 additions & 1 deletion beacon_node/client/src/compute_light_client_updates.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use futures::channel::mpsc::Receiver;
use futures::StreamExt;
use slog::{error, Logger};
use tokio::sync::mpsc::Sender;

// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
// updates it is okay to drop some events in case of overloading. In normal network conditions
Expand All @@ -12,6 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
reprocess_tx: Sender<ReprocessQueueMessage>,
log: &Logger,
) {
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
Expand All @@ -20,10 +23,17 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
// Uses a bounded receiver, so may drop some SyncAggregates if very overloaded. This is okay
// since only the most recent updates have value.
while let Some(event) = light_client_server_rv.next().await {
let parent_root = event.0;

chain
.recompute_and_cache_light_client_updates(event)
.unwrap_or_else(|e| {
error!(log, "error computing light_client updates {:?}", e);
})
});

let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
if reprocess_tx.try_send(msg).is_err() {
error!(log, "Failed to inform light client update"; "parent_root" => %parent_root)
};
}
}
Loading