Skip to content

Commit

Permalink
Fix handling of skipped ASPA migration commands, and ASPA events in o…
Browse files Browse the repository at this point in the history
…ther commands.
  • Loading branch information
Tim Bruijnzeels committed Nov 27, 2023
1 parent 64e68ea commit 5ea3718
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 19 deletions.
15 changes: 13 additions & 2 deletions src/commons/eventsourcing/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ where
let mut changed_from_cached = false;

let latest_result = match self.cache_get(handle) {
Some(arc) => Ok(arc),
Some(arc) => {
trace!("found cached snapshot for {handle}");
Ok(arc)
}
None => {
// There was no cached aggregate, so try to get it
// or construct it from the store, and remember that
Expand All @@ -238,13 +241,15 @@ where
let snapshot_key = Self::key_for_snapshot(handle);
match kv.get(&snapshot_key)? {
Some(value) => {
trace!("found snapshot for {handle}");
let agg: A = serde_json::from_value(value)?;
Ok(Arc::new(agg))
}
None => {
let init_key = Self::key_for_command(handle, 0);
match kv.get(&init_key)? {
Some(value) => {
trace!("found init command for {handle}");
let init_command: StoredCommand<A> = serde_json::from_value(value)?;

match init_command.into_init() {
Expand All @@ -257,7 +262,10 @@ where
))),
}
}
None => Err(A::Error::from(AggregateStoreError::UnknownAggregate(handle.clone()))),
None => {
trace!("neither snapshot nor init command found for {handle}");
Err(A::Error::from(AggregateStoreError::UnknownAggregate(handle.clone())))
}
}
}
}
Expand Down Expand Up @@ -288,6 +296,7 @@ where
match kv.get(&key)? {
None => break,
Some(value) => {
trace!("found next command found for {handle}: {}", key);
let command: StoredCommand<A> = serde_json::from_value(value)?;
aggregate.apply_command(command);
changed_from_cached = true;
Expand All @@ -299,6 +308,8 @@ where
// If a command was passed in, try to apply it, and make sure that it is
// preserved (i.e. with events or an error).
let res = if let Some(cmd) = cmd_opt {
trace!("apply command {} to {}", cmd, handle);

let aggregate = Arc::make_mut(&mut agg);

let version = aggregate.version();
Expand Down
48 changes: 32 additions & 16 deletions src/upgrades/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,21 @@ impl std::error::Error for UpgradeError {}
//------------ DataUpgradeInfo -----------------------------------------------
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DataUpgradeInfo {
pub last_command: Option<u64>,
// version of the source command
pub last_migrated_command: Option<u64>,

// Version of migrated aggregate. Note certain source commands may be dropped.
pub migration_version: u64,

pub aspa_configs: HashMap<CustomerAsn, Vec<ProviderAsn>>,
}

impl DataUpgradeInfo {
fn next_command(&self) -> u64 {
self.last_command.map(|nr| nr + 1).unwrap_or(0)
}

fn increment_command(&mut self) {
if let Some(last_command) = self.last_command {
self.last_command = Some(last_command + 1);
fn increment_last_migrated_command(&mut self) {
if let Some(last_command) = self.last_migrated_command {
self.last_migrated_command = Some(last_command + 1);
} else {
self.last_command = Some(0)
self.last_migrated_command = Some(0)
}
}

Expand Down Expand Up @@ -374,11 +375,11 @@ pub trait UpgradeAggregateStorePre0_14 {
let mut data_upgrade_info = self.data_upgrade_info(&scope)?;

// Get the list of commands to prepare, starting with the last_command we got to (may be 0)
let old_cmd_keys = self.command_keys(&scope, data_upgrade_info.last_command.unwrap_or(0))?;
let old_cmd_keys = self.command_keys(&scope, data_upgrade_info.last_migrated_command.unwrap_or(0))?;

// Migrate the initialisation event, if not done in a previous run. This
// is a special event that has no command, so we need to do this separately.
if data_upgrade_info.last_command.is_none() {
if data_upgrade_info.last_migrated_command.is_none() {
let old_init_key = Self::event_key(scope.clone(), 0);

let old_init: OldStoredEvent<Self::OldInitEvent> = self.get(&old_init_key)?;
Expand All @@ -404,7 +405,7 @@ pub trait UpgradeAggregateStorePre0_14 {
let command = self.convert_init_event(old_init, handle.clone(), actor, time)?;

self.store_new_command(&scope, &command)?;
data_upgrade_info.increment_command();
data_upgrade_info.increment_last_migrated_command();
}

// Track commands migrated and time spent so we can report progress
Expand Down Expand Up @@ -439,10 +440,23 @@ pub trait UpgradeAggregateStorePre0_14 {
OldStoredEffect::Error { msg } => UnconvertedEffect::Error { msg: msg.clone() },
};

match self.convert_old_command(old_command, old_effect, data_upgrade_info.next_command())? {
// The migration version matches the version of the resulting aggregate when commands
// are applied. It starts with 0 for the init command, in which case the version in
// the data_upgrade_info is not updated.
//
// For commands we set the target version of the migrated to command to the current
// version of the aggregate, plus 1. If there is a an actual resulting command (with
// events or even an error) to be saved, then we save this command and increment the
// migration_version.
//
// Unfortunately, we do need this double bookkeeping of versions of source commands
// that are migrated vs the version of the aggregate, because some commands - such
// as pre 0.14.0 ASPA update commands may be dropped.
match self.convert_old_command(old_command, old_effect, data_upgrade_info.migration_version + 1)? {
CommandMigrationEffect::StoredCommand(command) => {
self.store_new_command(&scope, &command)?;
data_upgrade_info.increment_command();
// we only increment this when a command is saved
data_upgrade_info.migration_version += 1;
}
CommandMigrationEffect::AspaObjectsUpdates(updates) => {
data_upgrade_info.update_aspa_configs(updates);
Expand All @@ -452,8 +466,10 @@ pub trait UpgradeAggregateStorePre0_14 {
}
}

// Report progress and expected time to finish on every 100 commands evaluated.
total_migrated += 1;
data_upgrade_info.increment_last_migrated_command();

// Report progress and expected time to finish on every 100 commands evaluated.
if total_migrated % 100 == 0 {
// expected time: (total_migrated / (now - started)) * total

Expand Down Expand Up @@ -566,7 +582,7 @@ pub trait UpgradeAggregateStorePre0_14 {
// Unwrap is safe here, because if there was no last_command
// then we would have converted the init event above, and would
// have set this.
let last_command = data_upgrade_info.last_command.ok_or(UpgradeError::custom(
let last_command = data_upgrade_info.last_migrated_command.ok_or(UpgradeError::custom(
"called report_remaining_work before converting init event",
))?;

Expand Down
23 changes: 22 additions & 1 deletion src/upgrades/pre_0_14_0/cas_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,33 @@ impl UpgradeAggregateStorePre0_14 for CasMigration {
UnconvertedEffect::Success { events } => {
let mut full_events: Vec<CertAuthEvent> = vec![]; // We just had numbers, we need to include the full events
for old_event in events {
full_events.push(old_event.into());
match old_event {
Pre0_14_0CertAuthEvent::AspaConfigAdded { .. }
| Pre0_14_0CertAuthEvent::AspaConfigRemoved { .. }
| Pre0_14_0CertAuthEvent::AspaConfigUpdated { .. }
| Pre0_14_0CertAuthEvent::AspaObjectsUpdated { .. } => {
// we only expect AspaObjectsUpdated to be possible outside of
// Aspa related commands, e.g. because of a key rollover, but
// to be sure.. we do not migrate any of the ASPA events in
// this migration.
}
_ => {
full_events.push(old_event.into());
}
}
}
new_command_builder.finish_with_events(full_events)
}
};

// if the new command would be a no-op because no events are actually migrated,
// then return CommandMigrationEffect::Nothing
if let Some(events) = new_command.events() {
if events.is_empty() {
return Ok(CommandMigrationEffect::Nothing);
}
}

Ok(CommandMigrationEffect::StoredCommand(new_command))
}
}
Expand Down

0 comments on commit 5ea3718

Please sign in to comment.