diff --git a/tools/dtrace/perf-upstairs-wf.d b/tools/dtrace/perf-upstairs-wf.d index 1c2cfa572..2335137cc 100644 --- a/tools/dtrace/perf-upstairs-wf.d +++ b/tools/dtrace/perf-upstairs-wf.d @@ -24,19 +24,10 @@ crucible_upstairs*:::up-to-ds-write-start substart[arg0] = timestamp; } -crucible_upstairs*:::up-to-ds-flush-done, -crucible_upstairs*:::up-to-ds-write-done +crucible_upstairs*:::gw-flush-done, +crucible_upstairs*:::gw-write-done /substart[arg0]/ { @[probename] = quantize(timestamp - substart[arg0]); substart[arg0] = 0; - final[arg0] = timestamp; -} - -crucible_upstairs*:::gw-flush-done, -crucible_upstairs*:::gw-write-done -/final[arg0]/ -{ - @[probename] = quantize(timestamp - final[arg0]); - final[arg0] = 0; } diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 59406787f..a1d557f7f 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1232,341 +1232,149 @@ impl DownstairsClient { /// Handles a single IO operation /// - /// Returns `true` if the job is now ackable, `false` otherwise - /// /// If this is a read response, then the values in `responses` must - /// _already_ be decrypted (with corresponding validation results stored in - /// `read_validations`). + /// _already_ be decrypted, with validated contexts stored in + /// `responses.blocks`. pub(crate) fn process_io_completion( &mut self, ds_id: JobId, job: &mut DownstairsIO, responses: Result, - deactivate: bool, extent_info: Option, - ) -> bool { + ) { if job.state[self.client_id] == IOState::Skipped { // This job was already marked as skipped, and at that time // all required action was taken on it. We can drop any more // processing of it here and return. warn!(self.log, "Dropping already skipped job {}", ds_id); - return false; + return; } - let mut jobs_completed_ok = job.state_count().completed_ok(); - let mut ackable = false; - - let new_state = match &responses { - Ok(..) => { + let new_state = match responses { + Ok(read_data) => { // Messages have already been decrypted out-of-band - jobs_completed_ok += 1; - IOState::Done - } - Err(e) => { - // The downstairs sent us this error - error!( - self.log, - "DS Reports error {e:?} on job {}, {:?} EC", ds_id, job, - ); - IOState::Error(e.clone()) - } - }; - - // Update the state, maintaining various counters - let old_state = self.set_job_state(job, new_state.clone()); - - /* - * Verify the job was InProgress - */ - if old_state != IOState::InProgress { - // This job is in an unexpected state. - panic!( - "[{}] Job {} completed while not InProgress: {:?} {:?}", - self.client_id, ds_id, old_state, job - ); - } - - if let IOState::Error(e) = new_state { - // Some errors can be returned without considering the Downstairs - // bad. For example, it's still an error if a snapshot exists - // already but we should not increment downstairs_errors and - // transition that Downstairs to Failed - that downstairs is still - // able to serve IO. - match e { - CrucibleError::SnapshotExistsAlready(_) => { - // pass - } - _ => { - match job.work { - // Mark this downstairs as bad if this was a write, - // a write unwritten, or a flush - // XXX: Errors should be reported to nexus - IOop::Write { .. } - | IOop::WriteUnwritten { .. } - | IOop::Flush { .. } - | IOop::Barrier { .. } => { - self.stats.downstairs_errors += 1; - } - - // If a repair job errors, mark that downstairs as bad - IOop::ExtentFlushClose { .. } - | IOop::ExtentLiveRepair { .. } - | IOop::ExtentLiveReopen { .. } - | IOop::ExtentLiveNoOp { .. } => { - self.stats.downstairs_errors += 1; - } - - // If a read job fails, we sometimes need to panic. - IOop::Read { .. } => { - // It's possible we get a read error if the - // downstairs disconnects. However XXX, someone - // should be told about this error. - // - // Some errors, we need to panic on. - match e { - CrucibleError::HashMismatch => { - panic!( - "{} [{}] {} read hash mismatch {:?} {:?}", - self.cfg.session_id, self.client_id, ds_id, e, job - ); - } - CrucibleError::DecryptionError => { - panic!( - "[{}] {} read decrypt error {:?} {:?}", - self.client_id, ds_id, e, job - ); - } - _ => { - error!( - self.log, - "{} read error {:?} {:?}", - ds_id, - e, - job - ); + match job.work { + IOop::Read { .. } => { + assert!(!read_data.blocks.is_empty()); + assert!(extent_info.is_none()); + if job.data.is_none() { + job.data = Some(read_data); + } else { + // If another job has finished already, we compare + // our read hash to that and verify they are the + // same. + debug!(self.log, "Read already AckReady {ds_id}"); + let job_blocks = &job.data.as_ref().unwrap().blocks; + if job_blocks != &read_data.blocks { + // XXX This error needs to go to Nexus + // XXX This will become the "force all + // downstairs to stop and refuse to restart" + // mode. + let msg = format!( + "[{}] read hash mismatch on {} \n\ + Expected {:x?}\n\ + Computed {:x?}\n\ + job: {:?}", + self.client_id, + ds_id, + job_blocks, + read_data.blocks, + job, + ); + if job.replay { + info!(self.log, "REPLAY {msg}"); + } else { + panic!("{msg}"); } } } } - } - } - } else if job.acked { - assert_eq!(new_state, IOState::Done); - /* - * If this job is already acked, then we don't have much - * more to do here. If it's a flush, then we want to be - * sure to update the last flush for this client. - */ - match &job.work { - IOop::Flush { .. } => { - self.last_flush = ds_id; - } - IOop::Read { - start_eid, - start_offset, - .. - } => { - /* - * For a read, make sure the data from a previous read - * has the same hash - */ - let read_data = responses.unwrap(); - assert!(!read_data.blocks.is_empty()); - if job.data.as_ref().unwrap().blocks != read_data.blocks { - // XXX This error needs to go to Nexus - // XXX This will become the "force all downstairs - // to stop and refuse to restart" mode. - let msg = format!( - "[{}] read hash mismatch on id {}\n\ - session: {:?}\n\ - Expected {:x?}\n\ - Computed {:x?}\n\ - start eid:{:?} start offset:{:?}\n\ - job state:{:?}", - self.client_id, - ds_id, - self.cfg.session_id, - job.data.as_ref().unwrap().blocks, - read_data.blocks, - start_eid, - start_offset, - job.state, - ); - if job.replay { - info!(self.log, "REPLAY {}", msg); - } else { - panic!("{}", msg); - } - } - } - /* - * Write and WriteUnwritten IOs have no action here - * If this job was LiveRepair, we should never get here, - * as those jobs should never be acked before all three - * are done. - */ - IOop::Write { .. } - | IOop::WriteUnwritten { .. } - | IOop::Barrier { .. } => {} - IOop::ExtentFlushClose { .. } - | IOop::ExtentLiveRepair { .. } - | IOop::ExtentLiveReopen { .. } - | IOop::ExtentLiveNoOp { .. } => { - panic!( - "[{}] Bad job received in process_ds_completion: {:?}", - self.client_id, job - ); - } - } - } else { - assert_eq!(new_state, IOState::Done); - assert!(!job.acked); - - let read_data = responses.unwrap(); - - /* - * Transition this job from Done to AckReady if enough have - * returned ok. - */ - match &job.work { - IOop::Read { .. } => { - assert!(!read_data.blocks.is_empty()); - assert!(extent_info.is_none()); - if jobs_completed_ok == 1 { - assert!(job.data.is_none()); - job.data = Some(read_data); - assert!(!job.acked); - ackable = true; - debug!(self.log, "Read AckReady {}", ds_id.0); - cdt::up__to__ds__read__done!(|| ds_id.0); - } else { - /* - * If another job has finished already, we can - * compare our read hash to - * that and verify they are the same. - */ - debug!(self.log, "Read already AckReady {ds_id}"); - let job_blocks = &job.data.as_ref().unwrap().blocks; - if job_blocks != &read_data.blocks { - // XXX This error needs to go to Nexus - // XXX This will become the "force all downstairs - // to stop and refuse to restart" mode. - panic!( - "[{}] read hash mismatch on {} \n\ - Expected {:x?}\n\ - Computed {:x?}\n\ - job: {:?}", - self.client_id, - ds_id, - job_blocks, - read_data.blocks, - job, - ); - } - } - } - IOop::Write { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - assert!(extent_info.is_none()); - if jobs_completed_ok == 2 { - ackable = true; - cdt::up__to__ds__write__done!(|| ds_id.0); + IOop::Write { .. } + | IOop::WriteUnwritten { .. } + | IOop::Barrier { .. } => { + assert!(read_data.blocks.is_empty()); + assert!(read_data.data.is_empty()); + assert!(extent_info.is_none()); } - } - IOop::WriteUnwritten { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - assert!(extent_info.is_none()); - if jobs_completed_ok == 2 { - ackable = true; - cdt::up__to__ds__write__unwritten__done!(|| ds_id.0); - } - } - IOop::Flush { - snapshot_details, .. - } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - assert!(extent_info.is_none()); - /* - * If we are deactivating or have requested a - * snapshot, then we want an ACK from all three - * downstairs, not the usual two. - * - * TODO here for handling the case where one (or two, - * or three! gasp!) downstairs are Offline. - */ - let ack_at_num_jobs = - if deactivate || snapshot_details.is_some() { - 3 - } else { - 2 - }; - - if jobs_completed_ok == ack_at_num_jobs { - ackable = true; - cdt::up__to__ds__flush__done!(|| ds_id.0); - if deactivate { - debug!(self.log, "deactivate flush {ds_id} done"); - } - } - self.last_flush = ds_id; - } - IOop::Barrier { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - assert!(extent_info.is_none()); - - if jobs_completed_ok == 3 { - ackable = true; - cdt::up__to__ds__barrier__done!(|| ds_id.0); + IOop::Flush { .. } => { + assert!(read_data.blocks.is_empty()); + assert!(read_data.data.is_empty()); + assert!(extent_info.is_none()); + + self.last_flush = ds_id; } - } - IOop::ExtentFlushClose { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); + IOop::ExtentFlushClose { .. } => { + assert!(read_data.blocks.is_empty()); + assert!(read_data.data.is_empty()); - let ci = self.repair_info.replace(extent_info.unwrap()); - if ci.is_some() { - panic!( + let ci = self.repair_info.replace(extent_info.unwrap()); + if ci.is_some() { + panic!( "[{}] Unexpected repair found on insertion: {:?}", self.client_id, ci ); + } } - - if jobs_completed_ok == 3 { - debug!(self.log, "ExtentFlushClose {ds_id} AckReady"); - ackable = true; + IOop::ExtentLiveRepair { .. } + | IOop::ExtentLiveReopen { .. } + | IOop::ExtentLiveNoOp { .. } => { + assert!(read_data.blocks.is_empty()); + assert!(read_data.data.is_empty()); + assert!(extent_info.is_none()); } } - IOop::ExtentLiveRepair { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - if jobs_completed_ok == 3 { - debug!(self.log, "ExtentLiveRepair AckReady {ds_id}"); - ackable = true; + IOState::Done + } + Err(e) => { + // The downstairs sent us this error + error!( + self.log, + "DS Reports error {e:?} on job {}, {:?} EC", ds_id, job, + ); + match (&job.work, &e) { + // Some errors can be returned without considering the + // Downstairs bad. For example, it's still an error if a + // snapshot exists already but we should not increment + // `downstairs_errors` and transition that Downstairs to + // Failed - that downstairs is still able to serve IO. + (_, CrucibleError::SnapshotExistsAlready(..)) => (), + + // If a read job fails, we sometimes need to panic + (IOop::Read { .. }, CrucibleError::HashMismatch) => { + panic!( + "{} [{}] {} read hash mismatch {:?} {:?}", + self.cfg.session_id, self.client_id, ds_id, e, job + ); } - } - IOop::ExtentLiveReopen { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - if jobs_completed_ok == 3 { - debug!(self.log, "ExtentLiveReopen AckReady {ds_id}"); - ackable = true; + (IOop::Read { .. }, CrucibleError::DecryptionError) => { + panic!( + "[{}] {} read decrypt error {:?} {:?}", + self.client_id, ds_id, e, job + ); } - } - IOop::ExtentLiveNoOp { .. } => { - assert!(read_data.blocks.is_empty()); - assert!(read_data.data.is_empty()); - if jobs_completed_ok == 3 { - debug!(self.log, "ExtentLiveNoOp AckReady {ds_id}"); - ackable = true; + + // Other IO errors increment a counter and (higher up the + // call stack) cause us to mark this Downstairs as faulted. + // + // XXX: Errors should be reported to nexus + _ => { + self.stats.downstairs_errors += 1; } } + IOState::Error(e) } - } - ackable + }; + + // Update the state, maintaining various counters + let old_state = self.set_job_state(job, new_state); + + // The job must have been InProgress + assert_eq!( + old_state, + IOState::InProgress, + "[{}] Job {ds_id} completed while not InProgress: {job:?}", + self.client_id, + ); } /// Mark this client as disabled and halt its IO task diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 4f8a5d571..638da02c6 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -474,12 +474,77 @@ impl Downstairs { } } + /// Checks whether an ack and/or a retire check is necessary for the job + fn ack_check(&mut self, ds_id: JobId, deactivate: bool) { + let job = self.ds_active.get(&ds_id).unwrap(); + + // Find the number of `Done` jobs, which determines when we ack back to + // the Guest. In addition, we always ack (and perform a retire check) + // if the job is complete on all 3x downstairs. + let okay = job + .state + .iter() + .filter(|s| matches!(s, IOState::Done)) + .count(); + + let complete = job.state.iter().all(|s| { + matches!(s, IOState::Done | IOState::Error(..) | IOState::Skipped) + }); + + // Decide if we're ready to ack this job + let ack_ready = match &job.work { + IOop::Read { .. } => okay == 1, + IOop::Write { .. } | IOop::WriteUnwritten { .. } => okay == 2, + IOop::Flush { + snapshot_details, .. + } => { + let n = if deactivate || snapshot_details.is_some() { + 3 + } else { + 2 + }; + okay == n + } + IOop::Barrier { .. } + | IOop::ExtentFlushClose { .. } + | IOop::ExtentLiveRepair { .. } + | IOop::ExtentLiveReopen { .. } + | IOop::ExtentLiveNoOp { .. } => okay == 3, + }; + + // Do logging and ack the job + if !job.acked && (ack_ready || complete) { + match &job.work { + IOop::Flush { .. } => { + if deactivate { + debug!(self.log, "deactivate flush {ds_id} done"); + } + } + IOop::ExtentFlushClose { .. } => { + debug!(self.log, "ExtentFlushClose {ds_id} AckReady"); + } + IOop::ExtentLiveRepair { .. } => { + debug!(self.log, "ExtentLiveRepair AckReady {ds_id}"); + } + IOop::ExtentLiveReopen { .. } => { + debug!(self.log, "ExtentLiveReopen AckReady {ds_id}"); + } + IOop::ExtentLiveNoOp { .. } => { + debug!(self.log, "ExtentLiveNoOp AckReady {ds_id}"); + } + _ => (), + } + self.ack_job(ds_id); + } + + if complete { + self.retire_check(ds_id); + } + } + /// Send the ack for a single job back upstairs through `GuestWork` /// /// Update stats for the upstairs as well - /// - /// This is public for the sake of unit testing, but shouldn't be called - /// outside of this module normally. fn ack_job(&mut self, ds_id: JobId) { debug!(self.log, "ack_jobs process {}", ds_id); @@ -2238,10 +2303,7 @@ impl Downstairs { // Ack the job immediately if it was skipped on all 3x downstairs // (and wasn't previously acked, i.e. isn't a write) if skipped == 3 { - if !acked { - self.ack_job(ds_id); - } - self.retire_check(ds_id); + self.ack_check(ds_id, false); warn!(self.log, "job {} skipped on all downstairs", &ds_id); } } @@ -2592,8 +2654,7 @@ impl Downstairs { self.ds_active.len(), ); - let mut ack_jobs = vec![]; - let mut retire_check = vec![]; + let mut ack_check = vec![]; let mut number_jobs_skipped = 0; self.ds_active.for_each(|ds_id, job| { @@ -2602,30 +2663,10 @@ impl Downstairs { if matches!(state, IOState::InProgress) { self.clients[client_id].skip_job(*ds_id, job); number_jobs_skipped += 1; - - // Check to see if this being skipped means we can ACK - // the job back to the guest. - if job.acked { - // Push this onto a queue to do the retire check when - // we aren't doing a mutable iteration. - retire_check.push(*ds_id); - } else { - let wc = job.state_count(); - if (wc.error + wc.skipped + wc.done) == 3 { - info!( - self.log, - "[{}] notify = true for {}", client_id, ds_id - ); - ack_jobs.push(*ds_id); - } - } + ack_check.push(*ds_id); } }); - for ds_id in ack_jobs { - self.ack_job(ds_id); - } - info!( self.log, "[{}] changed {} jobs to fault skipped", @@ -2633,8 +2674,8 @@ impl Downstairs { number_jobs_skipped ); - for ds_id in retire_check { - self.retire_check(ds_id); + for ds_id in ack_check { + self.ack_check(ds_id, false); } } @@ -3284,24 +3325,13 @@ impl Downstairs { return; }; - let was_acked = job.acked; - let should_ack = self.clients[client_id].process_io_completion( + self.clients[client_id].process_io_completion( ds_id, job, responses, - deactivate, extent_info, ); - // If all 3 jobs are done, we can check here to see if we can remove - // this job from the DS list. - let wc = job.state_count(); - let complete = (wc.error + wc.skipped + wc.done) == 3; - - if !was_acked && (should_ack || complete) { - self.ack_job(ds_id); - } - // Decide what to do when we have an error from this IO. // Mark this downstairs as bad if this was a write or flush match self.client_error(ds_id, client_id) { @@ -3363,9 +3393,8 @@ impl Downstairs { } } - if complete { - self.retire_check(ds_id); - } + // Check whether this job needs to be acked or retired + self.ack_check(ds_id, deactivate); } /// Accessor for [`Downstairs::reconcile_repaired`] @@ -4949,8 +4978,8 @@ pub(crate) mod test { Some([3].as_slice()) ); - assert_eq!(ds.clients[ClientId::new(0)].stats.downstairs_errors, 0); - assert_eq!(ds.clients[ClientId::new(1)].stats.downstairs_errors, 0); + assert_eq!(ds.clients[ClientId::new(0)].stats.downstairs_errors, 1); + assert_eq!(ds.clients[ClientId::new(1)].stats.downstairs_errors, 1); assert_eq!(ds.clients[ClientId::new(2)].stats.downstairs_errors, 0); // send another read, and expect all to return something diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 4e798cfce..df0643c14 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -375,11 +375,6 @@ mod cdt { fn ds__repair__done(_: u64, _: u8) {} fn ds__noop__done(_: u64, _: u8) {} fn ds__reopen__done(_: u64, _: u8) {} - fn up__to__ds__read__done(_: u64) {} - fn up__to__ds__write__done(_: u64) {} - fn up__to__ds__write__unwritten__done(_: u64) {} - fn up__to__ds__flush__done(_: u64) {} - fn up__to__ds__barrier__done(_: u64) {} fn gw__read__done(_: u64) {} fn gw__write__done(_: u64) {} fn gw__write__unwritten__done(_: u64) {} @@ -513,12 +508,6 @@ pub struct WorkCounts { done: u64, // This IO has completed } -impl WorkCounts { - fn completed_ok(&self) -> u64 { - self.done - } -} - #[derive(Debug, Copy, Clone)] pub struct ExtentRepairIDs { close_id: JobId,