Skip to content

Commit

Permalink
Merge #4355
Browse files Browse the repository at this point in the history
4355: Transactions on event stream server r=Fraser999 a=Fraser999

This PR updates the event stream server to report `Transaction`s rather than `Deploy`s.

It is based on top of #4351, so only the final two commits ([9d4a881](9d4a881) and [65fa46f](65fa46f)) are relevant to this PR.

Closes [roadmap#186](casper-network/roadmap#186).

Co-authored-by: Fraser Hutchison <fraser@casperlabs.io>
  • Loading branch information
casperlabs-bors-ng[bot] and Fraser999 authored Oct 27, 2023
2 parents 10f1118 + 65fa46f commit 39f124b
Show file tree
Hide file tree
Showing 10 changed files with 1,123 additions and 179 deletions.
55 changes: 36 additions & 19 deletions node/src/components/event_stream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tokio::sync::{
use tracing::{error, info, warn};
use warp::Filter;

use casper_types::ProtocolVersion;
use casper_types::{ProtocolVersion, TransactionHeader};

use super::Component;
use crate::{
Expand Down Expand Up @@ -261,9 +261,9 @@ where
effects
}
Event::BlockAdded(_)
| Event::DeployAccepted(_)
| Event::DeployProcessed { .. }
| Event::DeploysExpired(_)
| Event::TransactionAccepted(_)
| Event::TransactionProcessed { .. }
| Event::TransactionsExpired(_)
| Event::Fault { .. }
| Event::FinalitySignature(_)
| Event::Step { .. } => {
Expand All @@ -288,24 +288,41 @@ where
block_hash: *block.hash(),
block: Box::new((*block).clone()),
}),
Event::DeployAccepted(deploy) => self.broadcast(SseData::DeployAccepted { deploy }),
Event::DeployProcessed {
deploy_hash,
deploy_header,
Event::TransactionAccepted(transaction) => {
self.broadcast(SseData::TransactionAccepted { transaction })
}
Event::TransactionProcessed {
transaction_hash,
transaction_header,
block_hash,
execution_result,
} => self.broadcast(SseData::DeployProcessed {
deploy_hash: Box::new(deploy_hash),
account: Box::new(deploy_header.account().clone()),
timestamp: deploy_header.timestamp(),
ttl: deploy_header.ttl(),
dependencies: deploy_header.dependencies().clone(),
block_hash: Box::new(block_hash),
execution_result,
}),
Event::DeploysExpired(deploy_hashes) => deploy_hashes
} => {
let (account, timestamp, ttl) = match *transaction_header {
TransactionHeader::Deploy(deploy_header) => (
deploy_header.account().clone(),
deploy_header.timestamp(),
deploy_header.ttl(),
),
TransactionHeader::V1(txn_header) => (
txn_header.account().clone(),
txn_header.timestamp(),
txn_header.ttl(),
),
};
self.broadcast(SseData::TransactionProcessed {
transaction_hash: Box::new(transaction_hash),
account: Box::new(account),
timestamp,
ttl,
block_hash: Box::new(block_hash),
execution_result,
})
}
Event::TransactionsExpired(transaction_hashes) => transaction_hashes
.into_iter()
.flat_map(|deploy_hash| self.broadcast(SseData::DeployExpired { deploy_hash }))
.flat_map(|transaction_hash| {
self.broadcast(SseData::TransactionExpired { transaction_hash })
})
.collect(),
Event::Fault {
era_id,
Expand Down
32 changes: 17 additions & 15 deletions node/src/components/event_stream_server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ use itertools::Itertools;

use casper_types::{
execution::{Effects, ExecutionResult},
Block, BlockHash, Deploy, DeployHash, DeployHeader, EraId, FinalitySignature, PublicKey,
Timestamp,
Block, BlockHash, EraId, FinalitySignature, PublicKey, Timestamp, Transaction, TransactionHash,
TransactionHeader,
};

#[derive(Debug)]
pub enum Event {
Initialize,
BlockAdded(Arc<Block>),
DeployAccepted(Arc<Deploy>),
DeployProcessed {
deploy_hash: DeployHash,
deploy_header: Box<DeployHeader>,
TransactionAccepted(Arc<Transaction>),
TransactionProcessed {
transaction_hash: TransactionHash,
transaction_header: Box<TransactionHeader>,
block_hash: BlockHash,
execution_result: Box<ExecutionResult>,
},
DeploysExpired(Vec<DeployHash>),
TransactionsExpired(Vec<TransactionHash>),
Fault {
era_id: EraId,
public_key: Box<PublicKey>,
Expand All @@ -40,19 +40,21 @@ impl Display for Event {
match self {
Event::Initialize => write!(formatter, "initialize"),
Event::BlockAdded(block) => write!(formatter, "block added {}", block.hash()),
Event::DeployAccepted(deploy_hash) => {
write!(formatter, "deploy accepted {}", deploy_hash)
Event::TransactionAccepted(transaction_hash) => {
write!(formatter, "transaction accepted {}", transaction_hash)
}
Event::DeploysExpired(deploy_hashes) => {
Event::TransactionProcessed {
transaction_hash, ..
} => {
write!(formatter, "transaction processed {}", transaction_hash)
}
Event::TransactionsExpired(transaction_hashes) => {
write!(
formatter,
"deploys expired: {}",
deploy_hashes.iter().join(", ")
"transactions expired: {}",
transaction_hashes.iter().join(", ")
)
}
Event::DeployProcessed { deploy_hash, .. } => {
write!(formatter, "deploy processed {}", deploy_hash)
}
Event::Fault {
era_id,
public_key,
Expand Down
96 changes: 56 additions & 40 deletions node/src/components/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ use warp::{
};

#[cfg(test)]
use casper_types::{execution::ExecutionResultV2, testing::TestRng, TestBlockBuilder};
use casper_types::{
execution::ExecutionResultV2, testing::TestRng, Deploy, TestBlockBuilder,
TestTransactionV1Builder,
};
use casper_types::{
execution::{Effects, ExecutionResult},
Block, BlockHash, Deploy, DeployHash, EraId, FinalitySignature, ProtocolVersion, PublicKey,
TimeDiff, Timestamp,
Block, BlockHash, EraId, FinalitySignature, ProtocolVersion, PublicKey, TimeDiff, Timestamp,
Transaction, TransactionHash,
};

/// The URL root path.
Expand All @@ -60,24 +63,23 @@ pub enum SseData {
block_hash: BlockHash,
block: Box<Block>,
},
/// The given deploy has been newly-accepted by this node.
DeployAccepted {
#[schemars(with = "Deploy", description = "a deploy")]
deploy: Arc<Deploy>,
/// The given transaction has been newly-accepted by this node.
TransactionAccepted {
#[schemars(with = "Transaction", description = "a transaction")]
transaction: Arc<Transaction>,
},
/// The given deploy has been executed, committed and forms part of the given block.
DeployProcessed {
deploy_hash: Box<DeployHash>,
/// The given transaction has been executed, committed and forms part of the given block.
TransactionProcessed {
transaction_hash: Box<TransactionHash>,
account: Box<PublicKey>,
timestamp: Timestamp,
ttl: TimeDiff,
dependencies: Vec<DeployHash>,
block_hash: Box<BlockHash>,
#[data_size(skip)]
execution_result: Box<ExecutionResult>,
},
/// The given deploy has expired.
DeployExpired { deploy_hash: DeployHash },
/// The given transaction has expired.
TransactionExpired { transaction_hash: TransactionHash },
/// Generic representation of validator's fault in an era.
Fault {
era_id: EraId,
Expand Down Expand Up @@ -106,34 +108,48 @@ impl SseData {
}
}

/// Returns a random `SseData::DeployAccepted`, along with the random `Deploy`.
pub(super) fn random_deploy_accepted(rng: &mut TestRng) -> (Self, Deploy) {
let deploy = Deploy::random(rng);
let event = SseData::DeployAccepted {
deploy: Arc::new(deploy.clone()),
/// Returns a random `SseData::TransactionAccepted`, along with the random `Transaction`.
pub(super) fn random_transaction_accepted(rng: &mut TestRng) -> (Self, Transaction) {
let txn = Transaction::random(rng);
let event = SseData::TransactionAccepted {
transaction: Arc::new(txn.clone()),
};
(event, deploy)
(event, txn)
}

/// Returns a random `SseData::DeployProcessed`.
pub(super) fn random_deploy_processed(rng: &mut TestRng) -> Self {
let deploy = Deploy::random(rng);
SseData::DeployProcessed {
deploy_hash: Box::new(*deploy.hash()),
account: Box::new(deploy.header().account().clone()),
timestamp: deploy.header().timestamp(),
ttl: deploy.header().ttl(),
dependencies: deploy.header().dependencies().clone(),
/// Returns a random `SseData::TransactionProcessed`.
pub(super) fn random_transaction_processed(rng: &mut TestRng) -> Self {
let txn = Transaction::random(rng);
let (timestamp, ttl) = match &txn {
Transaction::Deploy(deploy) => (deploy.timestamp(), deploy.ttl()),
Transaction::V1(txn) => (txn.timestamp(), txn.ttl()),
};
SseData::TransactionProcessed {
transaction_hash: Box::new(txn.hash()),
account: Box::new(txn.account().clone()),
timestamp,
ttl,
block_hash: Box::new(BlockHash::random(rng)),
execution_result: Box::new(ExecutionResult::from(ExecutionResultV2::random(rng))),
}
}

/// Returns a random `SseData::DeployExpired`
pub(super) fn random_deploy_expired(rng: &mut TestRng) -> Self {
let deploy = crate::testing::create_expired_deploy(Timestamp::now(), rng);
SseData::DeployExpired {
deploy_hash: *deploy.hash(),
/// Returns a random `SseData::TransactionExpired`
pub(super) fn random_transaction_expired(rng: &mut TestRng) -> Self {
let timestamp = Timestamp::now() - TimeDiff::from_seconds(20);
let ttl = TimeDiff::from_seconds(10);
let txn = if rng.gen() {
Transaction::from(Deploy::random_with_timestamp_and_ttl(rng, timestamp, ttl))
} else {
let txn = TestTransactionV1Builder::new(rng)
.with_timestamp(timestamp)
.with_ttl(ttl)
.build();
Transaction::from(txn)
};

SseData::TransactionExpired {
transaction_hash: txn.hash(),
}
}

Expand Down Expand Up @@ -170,8 +186,8 @@ impl SseData {

#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct DeployAccepted {
pub(super) deploy_accepted: Arc<Deploy>,
pub(super) struct TransactionAccepted {
pub(super) transaction_accepted: Arc<Transaction>,
}

/// The components of a single SSE.
Expand Down Expand Up @@ -246,8 +262,8 @@ async fn map_server_sent_event(
}))),

&SseData::BlockAdded { .. }
| &SseData::DeployProcessed { .. }
| &SseData::DeployExpired { .. }
| &SseData::TransactionProcessed { .. }
| &SseData::TransactionExpired { .. }
| &SseData::Fault { .. }
| &SseData::Step { .. }
| &SseData::FinalitySignature(_)
Expand All @@ -259,9 +275,9 @@ async fn map_server_sent_event(
})
.id(id))),

SseData::DeployAccepted { deploy } => Some(Ok(WarpServerSentEvent::default()
.json_data(&DeployAccepted {
deploy_accepted: deploy.clone(),
SseData::TransactionAccepted { transaction } => Some(Ok(WarpServerSentEvent::default()
.json_data(&TransactionAccepted {
transaction_accepted: Arc::clone(transaction),
})
.unwrap_or_else(|error| {
warn!(%error, "failed to jsonify sse event");
Expand Down
Loading

0 comments on commit 39f124b

Please sign in to comment.