Skip to content

Commit

Permalink
Fix aspa migration issues (#1163)
Browse files Browse the repository at this point in the history
* Pre 0.14.x AspaDefinitionUpdates also used plain Asn for "remove"
* Fix handling of skipped ASPA migration commands, and ASPA events in other commands.
* Fix upgraded for 0.9.6 (and use better testdata)
* Improve test data
  • Loading branch information
Tim Bruijnzeels committed Dec 6, 2023
1 parent 5b7e353 commit 8dcbc19
Show file tree
Hide file tree
Showing 469 changed files with 7,355 additions and 5,229 deletions.
15 changes: 13 additions & 2 deletions src/commons/eventsourcing/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ where
let mut changed_from_cached = false;

let latest_result = match self.cache_get(handle) {
Some(arc) => Ok(arc),
Some(arc) => {
trace!("found cached snapshot for {handle}");
Ok(arc)
}
None => {
// There was no cached aggregate, so try to get it
// or construct it from the store, and remember that
Expand All @@ -238,13 +241,15 @@ where
let snapshot_key = Self::key_for_snapshot(handle);
match kv.get(&snapshot_key)? {
Some(value) => {
trace!("found snapshot for {handle}");
let agg: A = serde_json::from_value(value)?;
Ok(Arc::new(agg))
}
None => {
let init_key = Self::key_for_command(handle, 0);
match kv.get(&init_key)? {
Some(value) => {
trace!("found init command for {handle}");
let init_command: StoredCommand<A> = serde_json::from_value(value)?;

match init_command.into_init() {
Expand All @@ -257,7 +262,10 @@ where
))),
}
}
None => Err(A::Error::from(AggregateStoreError::UnknownAggregate(handle.clone()))),
None => {
trace!("neither snapshot nor init command found for {handle}");
Err(A::Error::from(AggregateStoreError::UnknownAggregate(handle.clone())))
}
}
}
}
Expand Down Expand Up @@ -288,6 +296,7 @@ where
match kv.get(&key)? {
None => break,
Some(value) => {
trace!("found next command found for {handle}: {}", key);
let command: StoredCommand<A> = serde_json::from_value(value)?;
aggregate.apply_command(command);
changed_from_cached = true;
Expand All @@ -299,6 +308,8 @@ where
// If a command was passed in, try to apply it, and make sure that it is
// preserved (i.e. with events or an error).
let res = if let Some(cmd) = cmd_opt {
trace!("apply command {} to {}", cmd, handle);

let aggregate = Arc::make_mut(&mut agg);

let version = aggregate.version();
Expand Down
56 changes: 36 additions & 20 deletions src/upgrades/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,21 @@ impl std::error::Error for UpgradeError {}
//------------ DataUpgradeInfo -----------------------------------------------
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DataUpgradeInfo {
pub last_command: Option<u64>,
// version of the source command
pub last_migrated_command: Option<u64>,

// Version of migrated aggregate. Note certain source commands may be dropped.
pub migration_version: u64,

pub aspa_configs: HashMap<CustomerAsn, Vec<ProviderAsn>>,
}

impl DataUpgradeInfo {
fn next_command(&self) -> u64 {
self.last_command.map(|nr| nr + 1).unwrap_or(0)
}

fn increment_command(&mut self) {
if let Some(last_command) = self.last_command {
self.last_command = Some(last_command + 1);
fn increment_last_migrated_command(&mut self) {
if let Some(last_command) = self.last_migrated_command {
self.last_migrated_command = Some(last_command + 1);
} else {
self.last_command = Some(0)
self.last_migrated_command = Some(0)
}
}

Expand Down Expand Up @@ -374,11 +375,11 @@ pub trait UpgradeAggregateStorePre0_14 {
let mut data_upgrade_info = self.data_upgrade_info(&scope)?;

// Get the list of commands to prepare, starting with the last_command we got to (may be 0)
let old_cmd_keys = self.command_keys(&scope, data_upgrade_info.last_command.unwrap_or(0))?;
let old_cmd_keys = self.command_keys(&scope, data_upgrade_info.last_migrated_command.unwrap_or(0))?;

// Migrate the initialisation event, if not done in a previous run. This
// is a special event that has no command, so we need to do this separately.
if data_upgrade_info.last_command.is_none() {
if data_upgrade_info.last_migrated_command.is_none() {
let old_init_key = Self::event_key(scope.clone(), 0);

let old_init: OldStoredEvent<Self::OldInitEvent> = self.get(&old_init_key)?;
Expand All @@ -404,7 +405,7 @@ pub trait UpgradeAggregateStorePre0_14 {
let command = self.convert_init_event(old_init, handle.clone(), actor, time)?;

self.store_new_command(&scope, &command)?;
data_upgrade_info.increment_command();
data_upgrade_info.increment_last_migrated_command();
}

// Track commands migrated and time spent so we can report progress
Expand Down Expand Up @@ -439,10 +440,23 @@ pub trait UpgradeAggregateStorePre0_14 {
OldStoredEffect::Error { msg } => UnconvertedEffect::Error { msg: msg.clone() },
};

match self.convert_old_command(old_command, old_effect, data_upgrade_info.next_command())? {
// The migration version matches the version of the resulting aggregate when commands
// are applied. It starts with 0 for the init command, in which case the version in
// the data_upgrade_info is not updated.
//
// For commands we set the target version of the migrated to command to the current
// version of the aggregate, plus 1. If there is a an actual resulting command (with
// events or even an error) to be saved, then we save this command and increment the
// migration_version.
//
// Unfortunately, we do need this double bookkeeping of versions of source commands
// that are migrated vs the version of the aggregate, because some commands - such
// as pre 0.14.0 ASPA update commands may be dropped.
match self.convert_old_command(old_command, old_effect, data_upgrade_info.migration_version + 1)? {
CommandMigrationEffect::StoredCommand(command) => {
self.store_new_command(&scope, &command)?;
data_upgrade_info.increment_command();
// we only increment this when a command is saved
data_upgrade_info.migration_version += 1;
}
CommandMigrationEffect::AspaObjectsUpdates(updates) => {
data_upgrade_info.update_aspa_configs(updates);
Expand All @@ -452,8 +466,10 @@ pub trait UpgradeAggregateStorePre0_14 {
}
}

// Report progress and expected time to finish on every 100 commands evaluated.
total_migrated += 1;
data_upgrade_info.increment_last_migrated_command();

// Report progress and expected time to finish on every 100 commands evaluated.
if total_migrated % 100 == 0 {
// expected time: (total_migrated / (now - started)) * total

Expand Down Expand Up @@ -566,7 +582,7 @@ pub trait UpgradeAggregateStorePre0_14 {
// Unwrap is safe here, because if there was no last_command
// then we would have converted the init event above, and would
// have set this.
let last_command = data_upgrade_info.last_command.ok_or(UpgradeError::custom(
let last_command = data_upgrade_info.last_migrated_command.ok_or(UpgradeError::custom(
"called report_remaining_work before converting init event",
))?;

Expand Down Expand Up @@ -1098,15 +1114,15 @@ mod tests {
fn prepare_then_upgrade_0_10_3() {
test_upgrade(
"test-resources/migrations/v0_10_3/",
&["ca_objects", "cas", "pubd", "pubd_objects"],
&["ca_objects", "cas", "pubd", "pubd_objects", "signers", "status"],
);
}

#[test]
fn prepare_then_upgrade_0_11_0() {
test_upgrade(
"test-resources/migrations/v0_11_0/",
&["ca_objects", "cas", "pubd", "pubd_objects"],
&["ca_objects", "cas", "pubd", "pubd_objects", "signers", "status"],
);
}

Expand All @@ -1122,15 +1138,15 @@ mod tests {
fn prepare_then_upgrade_0_12_3() {
test_upgrade(
"test-resources/migrations/v0_12_3/",
&["ca_objects", "cas", "pubd", "pubd_objects"],
&["ca_objects", "cas", "pubd", "pubd_objects", "signers", "status"],
);
}

#[test]
fn prepare_then_upgrade_0_13_1() {
test_upgrade(
"test-resources/migrations/v0_13_1/",
&["ca_objects", "cas", "keys", "pubd", "pubd_objects", "signers", "status"],
&["ca_objects", "cas", "pubd", "pubd_objects", "signers", "status"],
);
}

Expand Down
25 changes: 23 additions & 2 deletions src/upgrades/pre_0_10_0/cas_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl UpgradeAggregateStorePre0_14 for CasMigration {
match old_event {
Pre0_10CertAuthEvent::AspaObjectsUpdated { updates, .. } => {
let ca = old_command.handle().clone();
let removed = updates.removed;
let removed = updates.removed.into_iter().map(rpki::resources::Asn::from).collect();
let added_or_updated = updates
.updated
.into_iter()
Expand Down Expand Up @@ -187,12 +187,33 @@ impl UpgradeAggregateStorePre0_14 for CasMigration {
UnconvertedEffect::Success { events } => {
let mut full_events: Vec<CertAuthEvent> = vec![]; // We just had numbers, we need to include the full events
for old_event in events {
full_events.push(old_event.try_into()?);
match old_event {
Pre0_10CertAuthEvent::AspaConfigAdded { .. }
| Pre0_10CertAuthEvent::AspaConfigRemoved { .. }
| Pre0_10CertAuthEvent::AspaConfigUpdated { .. }
| Pre0_10CertAuthEvent::AspaObjectsUpdated { .. } => {
// we only expect AspaObjectsUpdated to be possible outside of
// Aspa related commands, e.g. because of a key rollover, but
// to be sure.. we do not migrate any of the ASPA events in
// this migration.
}
_ => {
full_events.push(old_event.try_into()?);
}
}
}
new_command_builder.finish_with_events(full_events)
}
};

// if the new command would be a no-op because no events are actually migrated,
// then return CommandMigrationEffect::Nothing
if let Some(events) = new_command.events() {
if events.is_empty() {
return Ok(CommandMigrationEffect::Nothing);
}
}

Ok(CommandMigrationEffect::StoredCommand(new_command))
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/upgrades/pre_0_10_0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use rpki::ca::publication::Base64;
use rpki::repository::x509::Time;

use crate::commons::api::AspaDefinition;
use crate::commons::api::CustomerAsn;

pub use self::cas_migration::*;

Expand Down Expand Up @@ -52,5 +51,16 @@ pub struct Pre0_10_0AspaObjectsUpdates {
pub updated: Vec<Pre0_10_0AspaInfo>,

#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub removed: Vec<CustomerAsn>,
pub removed: Vec<Pre0_10_0CustomerAsn>,
}

//------------ Pre_0_10_0CustomerAsn -------------------------------------------

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Pre0_10_0CustomerAsn(Pre0_14_0ProviderAs); // re-use ProviderAs for string parsing

impl From<Pre0_10_0CustomerAsn> for rpki::resources::Asn {
fn from(pre: Pre0_10_0CustomerAsn) -> Self {
pre.0.provider
}
}
23 changes: 22 additions & 1 deletion src/upgrades/pre_0_14_0/cas_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,33 @@ impl UpgradeAggregateStorePre0_14 for CasMigration {
UnconvertedEffect::Success { events } => {
let mut full_events: Vec<CertAuthEvent> = vec![]; // We just had numbers, we need to include the full events
for old_event in events {
full_events.push(old_event.into());
match old_event {
Pre0_14_0CertAuthEvent::AspaConfigAdded { .. }
| Pre0_14_0CertAuthEvent::AspaConfigRemoved { .. }
| Pre0_14_0CertAuthEvent::AspaConfigUpdated { .. }
| Pre0_14_0CertAuthEvent::AspaObjectsUpdated { .. } => {
// we only expect AspaObjectsUpdated to be possible outside of
// Aspa related commands, e.g. because of a key rollover, but
// to be sure.. we do not migrate any of the ASPA events in
// this migration.
}
_ => {
full_events.push(old_event.into());
}
}
}
new_command_builder.finish_with_events(full_events)
}
};

// if the new command would be a no-op because no events are actually migrated,
// then return CommandMigrationEffect::Nothing
if let Some(events) = new_command.events() {
if events.is_empty() {
return Ok(CommandMigrationEffect::Nothing);
}
}

Ok(CommandMigrationEffect::StoredCommand(new_command))
}
}
Expand Down
34 changes: 0 additions & 34 deletions src/upgrades/pre_0_14_0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,37 +476,3 @@ pub struct Pre0_14_0AspaProvidersUpdate {
added: Vec<Pre0_14_0ProviderAs>,
removed: Vec<Pre0_14_0ProviderAs>,
}

// //------------ AspaObjectsUpdates ------------------------------------------

// #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
// pub struct Pre0_14_0AspaObjectsUpdates {
// #[serde(skip_serializing_if = "Vec::is_empty", default)]
// updated: Vec<Pre0_14_0AspaInfo>,

// #[serde(skip_serializing_if = "Vec::is_empty", default)]
// removed: Vec<CustomerAsn>,
// }

// //------------ Pre0_14_0AspaInfo -------------------------------------------

// #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
// pub struct Pre0_14_0AspaInfo {
// // The customer ASN and all Provider ASNs
// definition: Pre0_14_0AspaDefinition,

// // The validity time for this ASPA.
// validity: Validity,

// // The serial number (needed for revocation)
// serial: Serial,

// // The URI where this object is expected to be published
// uri: uri::Rsync,

// // The actual ASPA object in base64 format.
// base64: Base64,

// // The ASPA object's hash
// hash: Hash,
// }
4 changes: 2 additions & 2 deletions src/upgrades/pre_0_14_0/old_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
eventsourcing::WithStorableDetails,
},
daemon::ca::DropReason,
upgrades::pre_0_14_0::{Pre0_14_0AspaProvidersUpdate, Pre0_14_0ProviderAs},
upgrades::pre_0_14_0::Pre0_14_0AspaProvidersUpdate,
};

use super::Pre0_14_0AspaDefinition;
Expand Down Expand Up @@ -133,7 +133,7 @@ pub enum Pre0_14_0CertAuthStorableCommand {
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Pre0_14_0AspaDefinitionUpdates {
add_or_replace: Vec<Pre0_14_0AspaDefinition>,
remove: Vec<Pre0_14_0ProviderAs>, // was using string notation
remove: Vec<rpki::resources::Asn>,
}

impl From<Pre0_14_0CertAuthStorableCommand> for CertAuthStorableCommand {
Expand Down
13 changes: 13 additions & 0 deletions test-resources/migrations/test-setup/add-aspa-65000.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"add_or_replace": [
{
"customer": 65000,
"providers": [
"AS65001(v4)",
"AS65002(v6)",
"AS65003"
]
}
],
"remove": []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"add_or_replace": [
{
"customer": "AS65000",
"providers": [
"AS65001(v4)",
"AS65002(v6)",
"AS65003"
]
}
],
"remove": []
}
3 changes: 3 additions & 0 deletions test-resources/migrations/test-setup/aspa-0.9.6/readme.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add an ASPA:

curl --insecure -H "Authorization: Bearer 03e3ce77ebc2bf14753ee4783d1ceffb" -X POST -d @./add-aspa-65000.json https://localhost:3000/api/v1/cas/krill-test-0-14-3/aspas
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"add_or_replace": [
],
"remove": ["AS65000" ]
}
Loading

0 comments on commit 8dcbc19

Please sign in to comment.