Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for public extensions in Reports. #754

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dapf/src/acceptance/load_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ pub async fn execute_single_combination_from_env(
&measurment,
VERSION,
system_now.0,
Some(vec![]),
vec![messages::Extension::Taskprov],
t.replay_reports,
)
Expand Down
4 changes: 4 additions & 0 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,10 @@ impl Test {
measurement.as_ref(),
version,
now.0,
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![messages::Extension::Taskprov],
self.replay_reports,
)
Expand Down
114 changes: 114 additions & 0 deletions crates/daphne-server/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ async fn leader_upload(version: DapVersion) {
report_metadata: ReportMetadata {
id: ReportId([1; 16]),
time: t.now,
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
},
public_share: b"public share".to_vec(),
encrypted_input_shares: [
Expand Down Expand Up @@ -424,6 +428,10 @@ async fn leader_upload_taskprov() {
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![Extension::Taskprov],
version,
)
Expand Down Expand Up @@ -451,6 +459,10 @@ async fn leader_upload_taskprov() {
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![Extension::Taskprov],
version,
)
Expand Down Expand Up @@ -516,6 +528,10 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![Extension::Taskprov],
version,
)
Expand All @@ -541,6 +557,100 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {

async_test_versions!(leader_upload_taskprov_wrong_version);

#[tokio::test]
jhoyla marked this conversation as resolved.
Show resolved Hide resolved
async fn leader_upload_taksprov_public_errors() {
let version = DapVersion::Latest;
let t = TestRunner::default_with_version(version).await;
let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
query: DapBatchMode::TimeInterval,
leader_url: t.task_config.leader_url.clone(),
helper_url: t.task_config.helper_url.clone(),
..Default::default()
}
.to_config_with_taskprov(
b"cool task".to_vec(),
t.now,
daphne::roles::aggregator::TaskprovConfig {
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
},
)
.unwrap();

// Repeated public extension
let report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
Some(vec![Extension::Taskprov, Extension::Taskprov]),
vec![],
version,
)
.unwrap();
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"invalidMessage",
)
.await
.unwrap();

// Unsupported public extension
let report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
Some(vec![
Extension::Taskprov,
Extension::NotImplemented {
typ: 3,
payload: b"ignore".to_vec(),
},
]),
vec![],
version,
)
.unwrap();
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"unsupportedExtension",
)
.await
.unwrap();
}

async fn internal_leader_process(version: DapVersion) {
let t = TestRunner::default_with_version(version).await;
let path = t.upload_path();
Expand Down Expand Up @@ -1408,6 +1518,10 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
extensions,
version,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,24 @@ struct PartialDapTaskConfig @0xdcc9bf18fc62d406 {
vdafVerifyKey @4 :VdafVerifyKey;
}

struct PublicExtensionsList @0x8b3c98c0ddd0043e {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mendess I just ran capnp id and pasted the output here. So this is really just a random ID that needs to be unique?


union {
# Each extension is encoded according to the DAP spec in
# tag-length-value form.
list @0 :List(Data);

# draft09 compatibility: Previously DAP had no extensions in the
# report.
none @1 :Void;
}
}

struct ReportMetadata @0xefba178ad4584bc4 {

id @0 :Base.ReportId;
time @1 :Base.Time;
id @0 :Base.ReportId;
time @1 :Base.Time;
publicExtensions @2 :PublicExtensionsList;
}

struct PrepareInit @0x8192568cb3d03f59 {
Expand Down
88 changes: 82 additions & 6 deletions crates/daphne-service-utils/src/compute_offload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ use crate::{
hpke_receiver_config::{self, hpke_config},
initialize_reports,
initialized_reports::{self, initialized_report},
partial_dap_task_config, prepare_init, report_metadata, time_range,
partial_dap_task_config, prepare_init, public_extensions_list, report_metadata, time_range,
},
};
use daphne::{
constants::DapAggregatorRole,
hpke::{HpkeConfig, HpkeReceiverConfig},
messages::{self, HpkeCiphertext, PrepareInit, ReportMetadata, ReportShare, TaskId},
messages::{self, Extension, HpkeCiphertext, PrepareInit, ReportMetadata, ReportShare, TaskId},
vdaf::{VdafConfig, VdafPrepShare, VdafPrepState},
InitializedReport, PartialDapTaskConfigForReportInit, WithPeerPrepShare,
};
use prio::codec::{Encode, ParameterizedDecode, ParameterizedEncode};
use prio::codec::{Decode, Encode, ParameterizedDecode, ParameterizedEncode};
use std::{borrow::Cow, ops::Range};

pub struct InitializeReports<'s> {
Expand Down Expand Up @@ -318,9 +318,27 @@ impl CapnprotoPayloadEncode for ReportMetadata {
type Builder<'a> = report_metadata::Builder<'a>;

fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
let Self { id, time } = self;
let Self {
id,
time,
public_extensions,
} = self;
id.encode_to_builder(builder.reborrow().init_id());
builder.set_time(*time);
if let Some(ref extensions) = public_extensions {
let mut e = builder
.init_public_extensions()
.init_list(usize_to_capnp_len(extensions.len()));
for (i, data) in extensions
.iter()
.enumerate()
.map(|(i, ext)| (usize_to_capnp_len(i), ext.get_encoded().unwrap()))
{
e.reborrow().set(i, &data);
}
} else {
builder.init_public_extensions().set_none(());
}
}
}

Expand All @@ -331,9 +349,25 @@ impl CapnprotoPayloadDecode for ReportMetadata {
where
Self: Sized,
{
let id = <_>::decode_from_reader(reader.get_id()?)?;
let time = reader.get_time();
let public_extensions = match reader.get_public_extensions()?.which()? {
public_extensions_list::List(list) => Some(
list?
.into_iter()
.map(|data| {
Extension::get_decoded(data?)
.map_err(|e| capnp::Error::failed(e.to_string()))
})
.collect::<Result<Vec<_>, capnp::Error>>()?,
),
public_extensions_list::None(()) => None,
};

Ok(Self {
id: <_>::decode_from_reader(reader.get_id()?)?,
time: reader.get_time(),
id,
time,
public_extensions,
})
}
}
Expand Down Expand Up @@ -486,3 +520,45 @@ fn to_capnp<E: ToString>(e: E) -> capnp::Error {
extra: e.to_string(),
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::capnproto::{CapnprotoPayloadDecodeExt, CapnprotoPayloadEncodeExt};

#[test]
fn report_metadata_roundtrip() {
let report_metadata = ReportMetadata {
id: messages::ReportId(rand::random()),
time: rand::random(),
public_extensions: Some(vec![
Extension::Taskprov,
Extension::NotImplemented {
typ: 23,
payload: b"some extension payload".to_vec(),
},
]),
};

assert_eq!(
report_metadata,
ReportMetadata::decode_from_bytes(&report_metadata.encode_to_bytes()).unwrap()
);
}

#[test]
fn report_metadata_roundtrip_draft09() {
let report_metadata = ReportMetadata {
id: messages::ReportId(rand::random()),
time: rand::random(),
// draft09 compatibility: Previously there was no extensions field in the report
// metadata.
public_extensions: None,
};

assert_eq!(
report_metadata,
ReportMetadata::decode_from_bytes(&report_metadata.encode_to_bytes()).unwrap()
);
}
}
Loading
Loading