diff --git a/Cargo.lock b/Cargo.lock index 86faab053..b8a907d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,6 +855,7 @@ dependencies = [ "atty", "crucible-workspace-hack", "dropshot", + "futures", "nix 0.27.1", "proptest", "rusqlite", @@ -870,6 +871,7 @@ dependencies = [ "tempfile", "test-strategy", "thiserror", + "tokio", "tokio-rustls 0.24.1", "toml 0.8.10", "twox-hash", diff --git a/common/Cargo.toml b/common/Cargo.toml index eb0b392d5..68904e43a 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true atty.workspace = true +futures.workspace = true nix.workspace = true rusqlite.workspace = true rustls-pemfile.workspace = true @@ -21,6 +22,7 @@ slog-dtrace.workspace = true slog-term.workspace = true tempfile.workspace = true thiserror.workspace = true +tokio.workspace = true tokio-rustls.workspace = true toml.workspace = true twox-hash.workspace = true diff --git a/common/src/lib.rs b/common/src/lib.rs index c5668853f..9f2f99421 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -8,10 +8,16 @@ use std::path::Path; use ErrorKind::NotFound; use anyhow::{anyhow, bail, Context, Result}; +use futures::{ + future::{ready, Either, Ready}, + stream::FuturesOrdered, + StreamExt, +}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use slog::Drain; use tempfile::NamedTempFile; +use tokio::sync::oneshot; mod region; pub use region::{ @@ -413,3 +419,87 @@ impl From for dropshot::HttpError { } } } + +/// Future stored in a [`DeferredQueue`] +/// +/// This is either an immediately-ready `T` or a oneshot channel which returns a +/// `T` when an off-task job finishes. +type DeferredQueueFuture = + Either>, oneshot::Receiver>; + +/// A `DeferredQueue` stores pending work (optionally executed off-task) +pub struct DeferredQueue { + /// Ordered stream of deferred futures + stream: FuturesOrdered>, + + /// Stores whether it is known that there are no futures in `self.stream` + /// + /// This is tracked separately because `FuturesOrdered::next` will + /// immediately return `None` if the queue is empty; we don't want that when + /// it's one of many options in a `tokio::select!`. + empty: bool, +} + +impl Default for DeferredQueue { + fn default() -> Self { + Self::new() + } +} + +impl DeferredQueue { + /// Build a new empty `FuturesOrdered` + pub fn new() -> Self { + Self { + stream: FuturesOrdered::new(), + empty: true, + } + } + + /// Stores a new future in the queue, marking it as non-empty + fn push_back(&mut self, f: DeferredQueueFuture) { + self.stream.push_back(f); + self.empty = false; + } + + /// Returns the next future from the queue + /// + /// If the future is `None`, then the queue is marked as empty + /// + /// This function is cancel safe: if a result is taken from the internal + /// `FuturesOrdered`, then it is guaranteed to be returned. + pub async fn next(&mut self) -> Option { + // Early exit if we know the stream is empty + if self.empty { + return None; + } + + // Cancel-safety: there can't be any yield points after this! + let t = self.stream.next().await; + self.empty |= t.is_none(); + + // The oneshot is managed by a worker thread, which should never be + // dropped, so we don't expect the oneshot to fail + t.map(|t| t.expect("oneshot failed")) + } + + /// Stores a new future in the queue, marking it as non-empty + pub fn push_immediate(&mut self, t: T) { + self.push_back(Either::Left(ready(Ok(t)))); + } + + /// Stores a new pending oneshot in the queue, returning the sender + pub fn push_oneshot(&mut self) -> oneshot::Sender { + let (rx, tx) = oneshot::channel(); + self.push_back(Either::Right(tx)); + rx + } + + /// Check whether the queue is known to be empty + /// + /// It is possible for this to return `false` if the queue is actually + /// empty; in that case, a subsequent call to `next()` will return `None` + /// and *later* calls to `is_empty()` will return `true`. + pub fn is_empty(&self) -> bool { + self.empty + } +} diff --git a/downstairs/src/deferred.rs b/downstairs/src/deferred.rs new file mode 100644 index 000000000..6d333dfcd --- /dev/null +++ b/downstairs/src/deferred.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Oxide Computer Company +use crate::extent::DownstairsBlockContext; +use crucible_common::{integrity_hash, CrucibleError}; +use crucible_protocol::Message; + +/// Result of a deferred `Message` +/// +/// In most cases, this is simply the original `Message` (stored in +/// `DeferredMessage::Other`). +pub(crate) enum DeferredMessage { + Write(Message, PrecomputedWrite), + Other(Message), +} + +/// Data needed to perform a write, which can be computed off-thread +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct PrecomputedWrite { + /// Checks whether incoming hashes are valid + pub validate_hashes_result: Result<(), CrucibleError>, + pub block_contexts: Vec, +} + +impl PrecomputedWrite { + #[cfg(test)] + pub(crate) fn empty() -> Self { + PrecomputedWrite { + validate_hashes_result: Ok(()), + block_contexts: vec![], + } + } + + /// Precomputes relevant data from a set of writes + pub(crate) fn from_writes(writes: &[crucible_protocol::Write]) -> Self { + let validate_hashes_result = Self::validate_hashes(writes); + let block_contexts = Self::compute_block_contexts(writes); + PrecomputedWrite { + validate_hashes_result, + block_contexts, + } + } + + fn compute_block_contexts( + writes: &[crucible_protocol::Write], + ) -> Vec { + writes + .iter() + .map(|write| { + // TODO it would be nice if we could profile what % of time we're + // spending on hashes locally vs writing to disk + let on_disk_hash = integrity_hash(&[&write.data[..]]); + + DownstairsBlockContext { + block_context: write.block_context, + block: write.offset.value, + on_disk_hash, + } + }) + .collect() + } + + fn validate_hashes( + writes: &[crucible_protocol::Write], + ) -> Result<(), CrucibleError> { + for write in writes { + let computed_hash = if let Some(encryption_context) = + &write.block_context.encryption_context + { + integrity_hash(&[ + &encryption_context.nonce[..], + &encryption_context.tag[..], + &write.data[..], + ]) + } else { + integrity_hash(&[&write.data[..]]) + }; + + if computed_hash != write.block_context.hash { + // TODO: print out the extent and block where this failed!! + return Err(CrucibleError::HashMismatch); + } + } + + Ok(()) + } +} + +impl DeferredMessage { + /// Returns a reference to the original message + pub(crate) fn into_parts(self) -> (Message, Option) { + match self { + DeferredMessage::Write(msg, pre) => (msg, Some(pre)), + DeferredMessage::Other(msg) => (msg, None), + } + } +} diff --git a/downstairs/src/dynamometer.rs b/downstairs/src/dynamometer.rs index 5b1de482b..e69e21dfe 100644 --- a/downstairs/src/dynamometer.rs +++ b/downstairs/src/dynamometer.rs @@ -83,7 +83,10 @@ pub async fn dynamometer( .collect(); let io_operation_time = Instant::now(); - region.region_write(&writes, JobId(1000), false).await?; + let pre = PrecomputedWrite::from_writes(&writes); + region + .region_write_pre(&writes, &pre, JobId(1000), false) + .await?; total_io_time += io_operation_time.elapsed(); io_operations_sent += num_writes; diff --git a/downstairs/src/extent.rs b/downstairs/src/extent.rs index dbbd31501..17add72f9 100644 --- a/downstairs/src/extent.rs +++ b/downstairs/src/extent.rs @@ -52,10 +52,29 @@ pub(crate) trait ExtentInner: Send + Sync + Debug { &mut self, job_id: JobId, writes: &[crucible_protocol::Write], + ctxs: &[DownstairsBlockContext], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError>; + #[cfg(test)] + fn write_without_precomputed_contexts( + &mut self, + job_id: JobId, + writes: &[crucible_protocol::Write], + only_write_unwritten: bool, + iov_max: usize, + ) -> Result<(), CrucibleError> { + let pre = PrecomputedWrite::from_writes(writes); + self.write( + job_id, + writes, + &pre.block_contexts, + only_write_unwritten, + iov_max, + ) + } + #[cfg(test)] fn get_block_contexts( &mut self, @@ -75,7 +94,7 @@ pub(crate) trait ExtentInner: Send + Sync + Debug { } /// BlockContext, with the addition of block index and on_disk_hash -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq)] pub struct DownstairsBlockContext { pub block_context: BlockContext, @@ -536,6 +555,7 @@ impl Extent { &mut self, job_id: JobId, writes: &[crucible_protocol::Write], + ctxs: &[DownstairsBlockContext], only_write_unwritten: bool, ) -> Result<(), CrucibleError> { if self.read_only { @@ -546,8 +566,13 @@ impl Extent { (job_id.0, self.number, writes.len() as u64) }); - self.inner - .write(job_id, writes, only_write_unwritten, self.iov_max)?; + self.inner.write( + job_id, + writes, + ctxs, + only_write_unwritten, + self.iov_max, + )?; cdt::extent__write__done!(|| { (job_id.0, self.number, writes.len() as u64) diff --git a/downstairs/src/extent_inner_raw.rs b/downstairs/src/extent_inner_raw.rs index ba3b0805b..3cbfb0920 100644 --- a/downstairs/src/extent_inner_raw.rs +++ b/downstairs/src/extent_inner_raw.rs @@ -152,9 +152,11 @@ impl ExtentInner for RawInner { &mut self, job_id: JobId, writes: &[crucible_protocol::Write], + ctxs: &[DownstairsBlockContext], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { + assert_eq!(writes.len(), ctxs.len()); // If the same block is written multiple times in a single write, then // (1) that's weird, and (2) we need to handle it specially. To handle // such cases, we split the `writes` slice into sub-slices of unique @@ -169,6 +171,7 @@ impl ExtentInner for RawInner { self.write_without_overlaps( job_id, &writes[start..i], + &ctxs[start..i], only_write_unwritten, iov_max, )?; @@ -998,13 +1001,18 @@ impl RawInner { /// first, then block data; if a single block is written multiple times, /// then we'd write multiple contexts, then multiple block data, and it /// would be possible for them to get out of sync. + /// + /// # Panics + /// If `writes.len() != ctxs.len()` fn write_without_overlaps( &mut self, job_id: JobId, writes: &[crucible_protocol::Write], + ctxs: &[DownstairsBlockContext], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { + assert_eq!(writes.len(), ctxs.len()); /* * In order to be crash consistent, perform the following steps in * order: @@ -1109,18 +1117,11 @@ impl RawInner { // Compute block contexts, then write them to disk let block_ctx: Vec<_> = writes .iter() - .filter(|write| !writes_to_skip.contains(&write.offset.value)) - .map(|write| { - // TODO it would be nice if we could profile what % of time we're - // spending on hashes locally vs writing to disk - let on_disk_hash = integrity_hash(&[&write.data[..]]); - - DownstairsBlockContext { - block_context: write.block_context, - block: write.offset.value, - on_disk_hash, - } + .zip(ctxs.iter()) + .filter(|(write, _ctx)| { + !writes_to_skip.contains(&write.offset.value) }) + .map(|(_write, ctx)| *ctx) .collect(); self.set_block_contexts(&block_ctx)?; @@ -1694,7 +1695,12 @@ mod test { hash, }, }; - inner.write(JobId(10), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(10), + &[write], + false, + IOV_MAX_TEST, + )?; // The context should be in place, though we haven't flushed yet @@ -1713,7 +1719,12 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(20), &[write], true, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(20), + &[write], + true, + IOV_MAX_TEST, + )?; let read = ReadRequest { eid: 0, @@ -1747,7 +1758,12 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + true, + IOV_MAX_TEST, + )?; let read = ReadRequest { eid: 0, @@ -1796,28 +1812,48 @@ mod test { offset: Block::new_512(1), ..write.clone() }; - inner.write(JobId(10), &[write1], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(10), + &[write1], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.context_slot_dirty[0], 0b00); assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[1], 0b10); assert_eq!(inner.active_context[1], ContextSlot::B); // The context should be written to block 0, slot B - inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(10), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.context_slot_dirty[0], 0b10); assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged // The context should be written to block 0, slot A - inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(11), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.context_slot_dirty[0], 0b11); assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged // The context should be written to slot B, forcing a sync - inner.write(JobId(12), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(12), + &[write], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.context_slot_dirty[0], 0b10); assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[1], 0b00); @@ -1846,7 +1882,12 @@ mod test { }, }; // The context should be written to slot B - inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(10), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); @@ -1856,17 +1897,32 @@ mod test { assert_eq!(inner.context_slot_dirty[0], 0b00); // The context should be written to slot A - inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(11), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b01); // The context should be written to slot B - inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(12), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b11); // The context should be written to slot A, forcing a sync - inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(12), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b01); @@ -1893,12 +1949,22 @@ mod test { }, }; // The context should be written to slot B - inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(10), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); // The context should be written to slot A - inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(11), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b11); @@ -1908,17 +1974,32 @@ mod test { assert_eq!(inner.context_slot_dirty[0], 0b00); // The context should be written to slot B - inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(12), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); // The context should be written to slot A - inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(12), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b11); // The context should be written to slot B, forcing a sync - inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(11), + &[write.clone()], + false, + IOV_MAX_TEST, + )?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); @@ -1970,7 +2051,12 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + true, + IOV_MAX_TEST, + )?; let read = ReadRequest { eid: 0, @@ -2013,7 +2099,12 @@ mod test { hash, }, }; - inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + false, + IOV_MAX_TEST, + )?; } for i in 0..10 { @@ -2061,7 +2152,12 @@ mod test { writes.push(write); } // This write has toggled every single context slot - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; for i in 0..10 { assert_eq!( inner.active_context[i], @@ -2113,7 +2209,12 @@ mod test { hash, }, }; - inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + false, + IOV_MAX_TEST, + )?; } // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2166,7 +2267,12 @@ mod test { // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- // A | B | A | B | A | B | B | B | B | B - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; for i in 0..10 { assert_eq!( inner.active_context[i], @@ -2220,7 +2326,12 @@ mod test { hash, }, }; - inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + false, + IOV_MAX_TEST, + )?; } // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2269,8 +2380,18 @@ mod test { }; writes.push(write); } - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; // This write should toggled every single context slot: // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2328,7 +2449,12 @@ mod test { hash, }, }; - inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + false, + IOV_MAX_TEST, + )?; } // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2377,8 +2503,18 @@ mod test { }; writes.push(write); } - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; // This write should toggled every single context slot: // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2470,7 +2606,12 @@ mod test { .collect(); assert_eq!(inner.context_slot_dirty[0], 0b00); - inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &writes, + false, + IOV_MAX_TEST, + )?; // The write should be split into four separate calls to // `write_without_overlaps`, triggering one bonus fsync. diff --git a/downstairs/src/extent_inner_sqlite.rs b/downstairs/src/extent_inner_sqlite.rs index 28e4414cf..ef4408b17 100644 --- a/downstairs/src/extent_inner_sqlite.rs +++ b/downstairs/src/extent_inner_sqlite.rs @@ -54,12 +54,14 @@ impl ExtentInner for SqliteInner { &mut self, job_id: JobId, writes: &[crucible_protocol::Write], + ctxs: &[DownstairsBlockContext], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { self.0.lock().unwrap().write( job_id, writes, + ctxs, only_write_unwritten, iov_max, ) @@ -383,9 +385,11 @@ impl SqliteMoreInner { &mut self, job_id: JobId, writes: &[crucible_protocol::Write], + ctxs: &[DownstairsBlockContext], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { + assert_eq!(writes.len(), ctxs.len()); for write in writes { check_input(self.extent_size, write.offset, &write.data)?; } @@ -501,21 +505,13 @@ impl SqliteMoreInner { }); let mut hashes_to_write = Vec::with_capacity(writes.len()); - for write in writes { + for (write, ctx) in writes.iter().zip(ctxs.iter()) { if writes_to_skip.contains(&write.offset.value) { hashes_to_write.push(None); continue; } - // TODO it would be nice if we could profile what % of time we're - // spending on hashes locally vs sqlite - let on_disk_hash = integrity_hash(&[&write.data[..]]); - - self.set_block_context(&DownstairsBlockContext { - block_context: write.block_context, - block: write.offset.value, - on_disk_hash, - })?; + self.set_block_context(ctx)?; // Worth some thought: this could happen inside // tx_set_block_context, if we passed a reference to dirty_blocks @@ -527,7 +523,7 @@ impl SqliteMoreInner { // Could make another function that wraps tx_set_block_context // and handles this as well. self.dirty_blocks.insert(write.offset.value as usize, None); - hashes_to_write.push(Some(on_disk_hash)); + hashes_to_write.push(Some(ctx.on_disk_hash)); } tx.commit()?; @@ -1626,7 +1622,12 @@ mod test { hash, }, }; - inner.write(JobId(10), &[write], false, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(10), + &[write], + false, + IOV_MAX_TEST, + )?; // We haven't flushed, but this should leave our context in place. inner.fully_rehash_and_clean_all_stale_contexts(false)?; @@ -1646,7 +1647,12 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(20), &[write], true, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(20), + &[write], + true, + IOV_MAX_TEST, + )?; let read = ReadRequest { eid: 0, @@ -1680,7 +1686,12 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + true, + IOV_MAX_TEST, + )?; let read = ReadRequest { eid: 0, @@ -1746,7 +1757,12 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; + inner.write_without_precomputed_contexts( + JobId(30), + &[write], + true, + IOV_MAX_TEST, + )?; let read = ReadRequest { eid: 0, diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 7baba25c7..85ac7080d 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -15,8 +15,8 @@ use std::time::Duration; use crucible_common::{ build_logger, crucible_bail, impacted_blocks::extent_from_offset, - integrity_hash, mkdir_for_file, Block, CrucibleError, RegionDefinition, - MAX_ACTIVE_COUNT, MAX_BLOCK_SIZE, + integrity_hash, mkdir_for_file, Block, CrucibleError, DeferredQueue, + RegionDefinition, MAX_ACTIVE_COUNT, MAX_BLOCK_SIZE, }; use crucible_protocol::{ BlockContext, CrucibleDecoder, CrucibleEncoder, JobId, Message, @@ -44,6 +44,9 @@ pub mod region; pub mod repair; mod stats; +mod deferred; +use deferred::{DeferredMessage, PrecomputedWrite}; + mod extent_inner_raw; mod extent_inner_sqlite; @@ -67,10 +70,12 @@ enum IOop { Write { dependencies: Vec, // Jobs that must finish before this writes: Vec, + pre: PrecomputedWrite, }, WriteUnwritten { dependencies: Vec, // Jobs that must finish before this writes: Vec, + pre: PrecomputedWrite, }, Read { dependencies: Vec, // Jobs that must finish before this @@ -322,7 +327,10 @@ pub async fn downstairs_import + std::fmt::Debug>( } // We have no job ID, so it makes no sense for accounting. - region.region_write(&writes, JobId(0), false).await?; + let pre = PrecomputedWrite::from_writes(&writes); + region + .region_write_pre(&writes, &pre, JobId(0), false) + .await?; assert_eq!(nblocks, pos); assert_eq!(total, pos.bytes()); @@ -1159,9 +1167,13 @@ where * │ │ │ │ * ┌────▼──┴─┐ message ┌┴──────┐ job ┌┴────────┐ * │resp_loop├──────────►│pf_task├─────────►│ dw_task │ - * └─────────┘ channel └──┬────┘ channel └▲────────┘ + * └──┬───▲──┘ channel └──┬────┘ channel └▲────────┘ + * │ │ │ │ + * defer│ │oneshot │ │ + * ┌▼───┴┐ │ │ + * │rayon│ add│work new│work + * └─────┘ │ │ * │ │ - * add│work new│work * per-connection │ │ * ========================= │ ============== │ =============== * shared state ┌──▼────────────────┴────────────┐ @@ -1218,6 +1230,7 @@ where // The lossy attribute currently does not change at runtime. To avoid // continually locking the downstairs, cache the result here. let lossy = ads.lock().await.lossy; + let mut deferred_msgs: DeferredQueue = Default::default(); loop { tokio::select! { @@ -1302,6 +1315,22 @@ where } } } + m = deferred_msgs.next(), if !deferred_msgs.is_empty() => { + match m { + Some(d) => { + if let Err(e) = message_channel_tx.send(d).await { + bail!( + "Failed sending message to proc_frame: {}", + e + ); + } + } + None => { + // Retrieving None should set the queue to empty + assert!(deferred_msgs.is_empty()); + } + } + } new_read = fr.next() => { match new_read { None => { @@ -1324,13 +1353,42 @@ where return Ok(()); } Some(Ok(msg)) => { + let should_defer = !deferred_msgs.is_empty(); if matches!(msg, Message::Ruok) { // Respond instantly to pings, don't wait. if let Err(e) = resp_channel_tx.send(Message::Imok).await { bail!("Failed sending Imok: {}", e); } - } else if let Err(e) = message_channel_tx.send(msg).await { - bail!("Failed sending message to proc_frame: {}", e); + } else if matches!(msg, + Message::Write { .. } | + Message::WriteUnwritten { .. }) { + // Defer the precomputation of write hashes to the + // rayon thread pool, to spare the tokio tasks. + let tx = deferred_msgs.push_oneshot(); + rayon::spawn(move || { + let pre = match &msg { + Message::Write { writes, .. } | + Message::WriteUnwritten { writes, .. } => { + PrecomputedWrite::from_writes(writes) + } + _ => unreachable!(), // checked above + }; + let _ = tx.send( + DeferredMessage::Write(msg, pre)); + }); + } else if !should_defer { + if let Err(e) = message_channel_tx.send( + DeferredMessage::Other(msg) + ).await { + bail!( + "Failed sending message to proc_frame: {}", + e + ); + } + } else { + deferred_msgs.push_immediate( + DeferredMessage::Other(msg) + ); } } Some(Err(e)) => { @@ -1689,7 +1747,7 @@ impl Downstairs { responses, })) } - IOop::WriteUnwritten { writes, .. } => { + IOop::WriteUnwritten { writes, pre, .. } => { /* * Any error from an IO should be intercepted here and passed * back to the upstairs. @@ -1703,7 +1761,9 @@ impl Downstairs { } else { // The region_write will handle what happens to each block // based on if they have data or not. - self.region.region_write(writes, job_id, true).await + self.region + .region_write_pre(writes, pre, job_id, true) + .await }; Ok(Some(Message::WriteUnwrittenAck { @@ -1716,6 +1776,7 @@ impl Downstairs { IOop::Write { dependencies, writes, + pre, } => { let result = if self.write_errors && random() && random() { warn!(self.log, "returning error on write!"); @@ -1724,7 +1785,9 @@ impl Downstairs { error!(self.log, "Upstairs inactive error"); Err(CrucibleError::UpstairsInactive) } else { - self.region.region_write(writes, job_id, false).await + self.region + .region_write_pre(writes, pre, job_id, false) + .await }; debug!( self.log, @@ -2330,10 +2393,11 @@ impl Downstairs { async fn proc_frame( ad: &Mutex, upstairs_connection: UpstairsConnection, - m: Message, + m: DeferredMessage, resp_tx: &mpsc::Sender, ) -> Result> { // Initial check against upstairs and session ID + let (m, write_metadata) = m.into_parts(); match m { Message::Write { upstairs_id, @@ -2406,6 +2470,7 @@ impl Downstairs { let new_write = IOop::Write { dependencies, writes, + pre: write_metadata.expect("must have write metadata"), }; let mut d = ad.lock().await; @@ -2446,6 +2511,7 @@ impl Downstairs { let new_write = IOop::WriteUnwritten { dependencies, writes, + pre: write_metadata.expect("must have write metadata"), }; let mut d = ad.lock().await; @@ -3433,6 +3499,7 @@ mod test { ); } + #[cfg(test)] fn add_work_rf( work: &mut Work, upstairs_connection: UpstairsConnection, @@ -3447,6 +3514,7 @@ mod test { work: IOop::WriteUnwritten { dependencies: deps, writes: Vec::with_capacity(1), + pre: PrecomputedWrite::empty(), }, state: WorkState::New, }, @@ -3948,6 +4016,7 @@ mod test { let writes = create_generic_test_write(eid); let rio = IOop::Write { dependencies: Vec::new(), + pre: PrecomputedWrite::from_writes(&writes), writes, }; ds.add_work(upstairs_connection, JobId(1000), rio)?; @@ -3967,6 +4036,7 @@ mod test { let rio = IOop::Write { dependencies: vec![JobId(1000), JobId(1001)], + pre: PrecomputedWrite::from_writes(&writes), writes, }; ds.add_work(upstairs_connection, JobId(1002), rio)?; @@ -4068,6 +4138,7 @@ mod test { let writes = create_generic_test_write(eid); let rio = IOop::Write { dependencies: Vec::new(), + pre: PrecomputedWrite::from_writes(&writes), writes, }; ds.add_work(upstairs_connection, JobId(1000), rio)?; @@ -4178,6 +4249,7 @@ mod test { let writes = create_generic_test_write(eid); let rio = IOop::Write { dependencies: Vec::new(), + pre: PrecomputedWrite::from_writes(&writes), writes, }; ds.add_work(upstairs_connection, JobId(1000), rio)?; @@ -4289,6 +4361,7 @@ mod test { let writes = create_generic_test_write(eid_one); let rio = IOop::Write { dependencies: Vec::new(), + pre: PrecomputedWrite::from_writes(&writes), writes, }; ds.add_work(upstairs_connection, JobId(1000), rio)?; @@ -4297,6 +4370,7 @@ mod test { let writes = create_generic_test_write(eid_two); let rio = IOop::Write { dependencies: Vec::new(), + pre: PrecomputedWrite::from_writes(&writes), writes, }; ds.add_work(upstairs_connection, JobId(1001), rio)?; diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index b34ed2966..1aa95b7f3 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -18,9 +18,13 @@ use crucible_protocol::SnapshotDetails; use repair_client::Client; use super::*; -use crate::extent::{ - copy_dir, extent_dir, extent_file_name, move_replacement_extent, - replace_dir, sync_path, Extent, ExtentMeta, ExtentState, ExtentType, +use crate::{ + deferred::PrecomputedWrite, + extent::{ + copy_dir, extent_dir, extent_file_name, move_replacement_extent, + replace_dir, sync_path, DownstairsBlockContext, Extent, ExtentMeta, + ExtentState, ExtentType, + }, }; /** @@ -760,37 +764,27 @@ impl Region { Ok(result) } - pub fn validate_hashes( - &self, + /// Perform a region write, without precomputed data + /// + /// This is only allowed in unit tests + #[cfg(test)] + #[instrument] + pub async fn region_write( + &mut self, writes: &[crucible_protocol::Write], + job_id: JobId, + only_write_unwritten: bool, ) -> Result<(), CrucibleError> { - for write in writes { - let computed_hash = if let Some(encryption_context) = - &write.block_context.encryption_context - { - integrity_hash(&[ - &encryption_context.nonce[..], - &encryption_context.tag[..], - &write.data[..], - ]) - } else { - integrity_hash(&[&write.data[..]]) - }; - - if computed_hash != write.block_context.hash { - error!(self.log, "Failed write hash validation"); - // TODO: print out the extent and block where this failed!! - crucible_bail!(HashMismatch); - } - } - - Ok(()) + let pre = PrecomputedWrite::from_writes(writes); + self.region_write_pre(writes, &pre, job_id, only_write_unwritten) + .await } #[instrument] - pub async fn region_write( + pub async fn region_write_pre( &mut self, writes: &[crucible_protocol::Write], + precomputed: &PrecomputedWrite, job_id: JobId, only_write_unwritten: bool, ) -> Result<(), CrucibleError> { @@ -801,19 +795,25 @@ impl Region { /* * Before anything, validate hashes */ - self.validate_hashes(writes)?; + if let Err(e) = &precomputed.validate_hashes_result { + error!(self.log, "Failed write hash validation"); + return Err(e.clone()); + } /* * Batch writes so they can all be sent to the appropriate extent * together. */ - let mut batched_writes: HashMap> = - HashMap::new(); + let mut batched_writes: HashMap< + usize, + (Vec, Vec), + > = HashMap::new(); - for write in writes { + for (write, ctx) in writes.iter().zip(&precomputed.block_contexts) { let extent_vec = batched_writes.entry(write.eid as usize).or_default(); - extent_vec.push(write.clone()); + extent_vec.0.push(write.clone()); + extent_vec.1.push(*ctx); } if only_write_unwritten { @@ -823,9 +823,9 @@ impl Region { } for eid in batched_writes.keys() { let extent = self.get_opened_extent_mut(*eid); - let writes = batched_writes.get(eid).unwrap(); + let (writes, ctxs) = batched_writes.get(eid).unwrap(); extent - .write(job_id, &writes[..], only_write_unwritten) + .write(job_id, writes, ctxs, only_write_unwritten) .await?; } diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs index 218cd0f6a..dac2e6fb4 100644 --- a/upstairs/src/deferred.rs +++ b/upstairs/src/deferred.rs @@ -8,91 +8,7 @@ use crate::{ }; use bytes::{Bytes, BytesMut}; use crucible_common::{integrity_hash, CrucibleError, RegionDefinition}; -use futures::{ - future::{ready, Either, Ready}, - stream::FuturesOrdered, - StreamExt, -}; use slog::{error, Logger}; -use tokio::sync::oneshot; - -/// Future stored in a [`DeferredQueue`] -/// -/// This is either an immediately-ready `T` or a oneshot channel which returns a -/// `T` when an off-task job finishes. -type DeferredQueueFuture = - Either>, oneshot::Receiver>; - -/// A `DeferredQueue` stores pending work (optionally executed off-task) -pub(crate) struct DeferredQueue { - /// Ordered stream of deferred futures - stream: FuturesOrdered>, - - /// Stores whether it is known that there are no futures in `self.stream` - /// - /// This is tracked separately because `FuturesOrdered::next` will - /// immediately return `None` if the queue is empty; we don't want that when - /// it's one of many options in a `tokio::select!`. - empty: bool, -} - -impl DeferredQueue { - /// Build a new empty `FuturesOrdered` - pub fn new() -> Self { - Self { - stream: FuturesOrdered::new(), - empty: true, - } - } - - /// Stores a new future in the queue, marking it as non-empty - pub fn push_back(&mut self, f: DeferredQueueFuture) { - self.stream.push_back(f); - self.empty = false; - } - - /// Returns the next future from the queue - /// - /// If the future is `None`, then the queue is marked as empty - /// - /// This function is cancel safe: if a result is taken from the internal - /// `FuturesOrdered`, then it guaranteed to be returned. - pub async fn next(&mut self) -> Option { - // Early exit if we know the stream is empty - if self.empty { - return None; - } - - // Cancel-safety: there can't be any yield points after this! - let t = self.stream.next().await; - self.empty |= t.is_none(); - - // The oneshot is managed by a worker thread, which should never be - // dropped, so we don't expect the oneshot - t.map(|t| t.expect("oneshot failed")) - } - - /// Stores a new future in the queue, marking it as non-empty - pub fn push_immediate(&mut self, t: T) { - self.push_back(Either::Left(ready(Ok(t)))); - } - - /// Stores a new pending oneshot in the queue, returning the sender - pub fn push_oneshot(&mut self) -> oneshot::Sender { - let (rx, tx) = oneshot::channel(); - self.push_back(Either::Right(tx)); - rx - } - - /// Check whether the queue is known to be empty - /// - /// It is possible for this to return `false` if the queue is actually - /// empty; in that case, a subsequent call to `next()` will return `None` - /// and *later* calls to `is_empty()` will return `true`. - pub fn is_empty(&self) -> bool { - self.empty - } -} //////////////////////////////////////////////////////////////////////////////// diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 10eb8855f..4281d4a72 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -6,8 +6,8 @@ use crate::{ control::ControlRequest, deadline_secs, deferred::{ - DeferredBlockReq, DeferredMessage, DeferredQueue, DeferredRead, - DeferredWrite, EncryptedWrite, + DeferredBlockReq, DeferredMessage, DeferredRead, DeferredWrite, + EncryptedWrite, }, downstairs::{Downstairs, DownstairsAction}, extent_from_offset, @@ -16,7 +16,7 @@ use crate::{ CrucibleOpts, DsState, EncryptionContext, GuestIoHandle, Message, RegionDefinition, RegionDefinitionStatus, SnapshotDetails, WQCounts, }; -use crucible_common::CrucibleError; +use crucible_common::{CrucibleError, DeferredQueue}; use serde::{Deserialize, Serialize}; use std::sync::{