From 3c515a538b5ee8889b7c207caa78a740e4afc983 Mon Sep 17 00:00:00 2001 From: Jonathan Hoyland Date: Wed, 8 Jan 2025 13:08:20 +0000 Subject: [PATCH 1/2] Reject reports earlier than task_config.not_before --- crates/dapf/src/main.rs | 4 ++ crates/daphne-server/src/roles/mod.rs | 3 +- crates/daphne-server/tests/e2e/e2e.rs | 8 +++ crates/daphne-server/tests/e2e/test_runner.rs | 4 +- .../src/test_route_types.rs | 1 + crates/daphne/src/lib.rs | 2 +- crates/daphne/src/protocol/mod.rs | 62 ++++++++++++++++++- crates/daphne/src/protocol/report_init.rs | 10 ++- crates/daphne/src/roles/leader/mod.rs | 8 ++- crates/daphne/src/roles/mod.rs | 2 +- crates/daphne/src/testing/mod.rs | 2 +- 11 files changed, 97 insertions(+), 9 deletions(-) diff --git a/crates/dapf/src/main.rs b/crates/dapf/src/main.rs index 685285ae..582a7df0 100644 --- a/crates/dapf/src/main.rs +++ b/crates/dapf/src/main.rs @@ -907,6 +907,10 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh || 604_800u64, "task should expire in", )?, + task_commencement: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), }; print_json(&internal_task); diff --git a/crates/daphne-server/src/roles/mod.rs b/crates/daphne-server/src/roles/mod.rs index 0e27339f..2ae71d9e 100644 --- a/crates/daphne-server/src/roles/mod.rs +++ b/crates/daphne-server/src/roles/mod.rs @@ -111,7 +111,6 @@ mod test_utils { fatal_error, hpke::{HpkeConfig, HpkeReceiverConfig}, messages::decode_base64url_vec, - roles::DapAggregator, vdaf::{Prio3Config, VdafConfig}, DapBatchMode, DapError, DapTaskConfig, DapVersion, }; @@ -306,7 +305,7 @@ mod test_utils { leader_url: cmd.leader, helper_url: cmd.helper, time_precision: cmd.time_precision, - not_before: self.get_current_time(), + not_before: cmd.task_commencement, not_after: cmd.task_expiration, min_batch_size: cmd.min_batch_size, query, diff --git a/crates/daphne-server/tests/e2e/e2e.rs b/crates/daphne-server/tests/e2e/e2e.rs index 6163d9e6..2fb0b8e8 100644 --- a/crates/daphne-server/tests/e2e/e2e.rs +++ b/crates/daphne-server/tests/e2e/e2e.rs @@ -1423,6 +1423,12 @@ async fn leader_collect_taskprov_ok(version: DapVersion) { ) .unwrap(); + println!("Now - not_before: {}", t.now - task_config.not_before); + println!( + "Now - batch_interval.start: {}", + t.now - batch_interval.start + ); + println!("t.now: {}", t.now); let path = TestRunner::upload_path_for_task(&task_id); let method = match version { DapVersion::Draft09 => &Method::PUT, @@ -1433,7 +1439,9 @@ async fn leader_collect_taskprov_ok(version: DapVersion) { let mut rng = thread_rng(); for _ in 0..t.task_config.min_batch_size { let extensions = vec![Extension::Taskprov]; + println!("Report interval: {:?}", TestRunner::report_interval(&batch_interval)); let now = rng.gen_range(TestRunner::report_interval(&batch_interval)); + println!("\tnow: {}", now); t.leader_request_expect_ok( client, &path, diff --git a/crates/daphne-server/tests/e2e/test_runner.rs b/crates/daphne-server/tests/e2e/test_runner.rs index 1556f856..6bacbc4d 100644 --- a/crates/daphne-server/tests/e2e/test_runner.rs +++ b/crates/daphne-server/tests/e2e/test_runner.rs @@ -116,7 +116,7 @@ impl TestRunner { version, leader_url: leader_url.clone(), helper_url: helper_url.clone(), - not_before: now, + not_before: now - (now % TIME_PRECISION) - 1, not_after: now + 604_800, // one week from now time_precision: TIME_PRECISION, min_batch_size: MIN_BATCH_SIZE, @@ -241,6 +241,7 @@ impl TestRunner { "time_precision": t.task_config.time_precision, "collector_hpke_config": collector_hpke_config_base64url.clone(), "task_expiration": t.task_config.not_after, + "task_commencement": t.task_config.not_before, }); let add_task_path = format!("{}/internal/test/add_task", version.as_ref()); let res: InternalTestCommandResult = t @@ -268,6 +269,7 @@ impl TestRunner { "time_precision": t.task_config.time_precision, "collector_hpke_config": collector_hpke_config_base64url.clone(), "task_expiration": t.task_config.not_after, + "task_commencement": t.task_config.not_before, }); let res: InternalTestCommandResult = t .helper_post_internal(&add_task_path, &helper_add_task_cmd) diff --git a/crates/daphne-service-utils/src/test_route_types.rs b/crates/daphne-service-utils/src/test_route_types.rs index 3fba74fd..f6545d27 100644 --- a/crates/daphne-service-utils/src/test_route_types.rs +++ b/crates/daphne-service-utils/src/test_route_types.rs @@ -113,5 +113,6 @@ pub struct InternalTestAddTask { pub max_batch_size: Option, pub time_precision: Duration, pub collector_hpke_config: String, // base64url + pub task_commencement: Time, pub task_expiration: Time, } diff --git a/crates/daphne/src/lib.rs b/crates/daphne/src/lib.rs index 7c91e311..7a87ebe0 100644 --- a/crates/daphne/src/lib.rs +++ b/crates/daphne/src/lib.rs @@ -596,7 +596,7 @@ impl DapTaskParameters { ) .unwrap() .into_opted_in(&taskprov::OptInParam { - not_before: now, + not_before: now - (now % self.time_precision) - 1, num_agg_span_shards: self.num_agg_span_shards, }); diff --git a/crates/daphne/src/protocol/mod.rs b/crates/daphne/src/protocol/mod.rs index 2f39c3ef..239b7486 100644 --- a/crates/daphne/src/protocol/mod.rs +++ b/crates/daphne/src/protocol/mod.rs @@ -274,7 +274,7 @@ mod test { test_versions! { produce_agg_job_req_skip_hpke_decrypt_err } fn produce_agg_job_req_skip_time_too_stale(version: DapVersion) { - let t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version); + let mut t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version); let reports = vec![t .task_config .vdaf @@ -287,6 +287,7 @@ mod test { ) .unwrap()]; + t.task_config.not_before = t.valid_report_time_range().start - 2; let (agg_job_state, _agg_job_init_req) = t.produce_agg_job_req(&DapAggregationParam::Empty, reports); assert_eq!(agg_job_state.report_count(), 0); @@ -298,6 +299,30 @@ mod test { test_versions! { produce_agg_job_req_skip_time_too_stale } + fn produce_agg_job_req_skip_time_before_not_before(version: DapVersion) { + let t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version); + let reports = vec![t + .task_config + .vdaf + .produce_report( + &t.client_hpke_config_list, + t.task_config.not_before - 1, + &t.task_id, + DapMeasurement::U32Vec(vec![1; 10]), + t.task_config.version, + ) + .unwrap()]; + let (agg_job_state, _agg_job_init_req) = + t.produce_agg_job_req(&DapAggregationParam::Empty, reports); + assert_eq!(agg_job_state.report_count(), 0); + + assert_metrics_include!(t.leader_registry, { + r#"report_counter{env="test_leader",host="leader.com",status="rejected_task_not_started"}"#: 1, + }); + } + + test_versions! {produce_agg_job_req_skip_time_before_not_before} + fn produce_agg_job_req_skip_time_too_early(version: DapVersion) { let t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version); let reports = vec![t @@ -403,6 +428,7 @@ mod test { // Temporarily overwrite the valid report time range so that the Leader accepts the // out-of-range report and produces the request. let tmp = t.valid_report_range.clone(); + t.task_config.not_before = t.valid_report_time_range().start - 2; t.valid_report_range = 0..u64::MAX; let (_, agg_job_init_req) = t.produce_agg_job_req(&DapAggregationParam::Empty, reports.clone()); @@ -420,6 +446,40 @@ mod test { test_versions! { handle_agg_job_req_skip_time_too_stale } + fn handle_agg_job_req_skip_time_before_not_before(version: DapVersion) { + let mut t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version); + let reports = vec![t + .task_config + .vdaf + .produce_report( + &t.client_hpke_config_list, + t.task_config.not_before - 1, + &t.task_id, + DapMeasurement::U32Vec(vec![1; 10]), + t.task_config.version, + ) + .unwrap()]; + + let agg_job_init_req = { + // Temporarily overwrite the task_config not_before time so that the Leader accepts the + // out-of-range report and produces the request. + t.task_config.not_before -= 2; + let (_, agg_job_init_req) = + t.produce_agg_job_req(&DapAggregationParam::Empty, reports.clone()); + t.task_config.not_before += 2; + agg_job_init_req + }; + let (_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req); + + assert_eq!(agg_job_resp.transitions.len(), 1); + assert_matches!( + agg_job_resp.transitions[0].var, + TransitionVar::Failed(ReportError::TaskNotStarted) + ); + } + + test_versions! {handle_agg_job_req_skip_time_before_not_before} + fn handle_agg_job_req_skip_time_too_early(version: DapVersion) { let mut t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version); let reports = vec![t diff --git a/crates/daphne/src/protocol/report_init.rs b/crates/daphne/src/protocol/report_init.rs index 7502ab55..8a21036c 100644 --- a/crates/daphne/src/protocol/report_init.rs +++ b/crates/daphne/src/protocol/report_init.rs @@ -100,6 +100,7 @@ impl InitializedReport { impl<'s> From<&'s DapTaskConfig> for PartialDapTaskConfigForReportInit<'s> { fn from(config: &'s DapTaskConfig) -> Self { PartialDapTaskConfigForReportInit { + not_before: config.not_before, not_after: config.not_after, method_is_taskprov: config.method_is_taskprov(), version: config.version, @@ -112,6 +113,7 @@ impl<'s> From<&'s DapTaskConfig> for PartialDapTaskConfigForReportInit<'s> { impl<'s> From<&'s PartialDapTaskConfigForReportInit<'_>> for PartialDapTaskConfigForReportInit<'s> { fn from(config: &'s PartialDapTaskConfigForReportInit<'_>) -> Self { Self { + not_before: config.not_before, not_after: config.not_after, method_is_taskprov: config.method_is_taskprov, version: config.version, @@ -122,6 +124,7 @@ impl<'s> From<&'s PartialDapTaskConfigForReportInit<'_>> for PartialDapTaskConfi } pub struct PartialDapTaskConfigForReportInit<'s> { + pub not_before: messages::Time, pub not_after: messages::Time, pub method_is_taskprov: bool, pub version: DapVersion, @@ -145,19 +148,24 @@ impl

InitializedReport

{ let task_config = task_config.into(); macro_rules! reject { ($failure:ident) => { + {tracing::warn!("Rejected - {}\nTimestamp - {}", ReportError::$failure, report_share.report_metadata.time); return Ok(InitializedReport::Rejected { metadata: report_share.report_metadata, report_err: ReportError::$failure, - }) + })} }; } + + tracing::info!("valid_report_range: {}..{}", valid_report_range.start, valid_report_range.end); match report_share.report_metadata.time { t if t >= task_config.not_after => reject!(TaskExpired), + t if t < task_config.not_before => {tracing::warn!("Reject TaskNotStarted"); reject!(TaskNotStarted)}, t if t < valid_report_range.start => reject!(ReportDropped), t if valid_report_range.end < t => reject!(ReportTooEarly), _ => {} } + tracing::warn!("All tests pass"); match ( &report_share.report_metadata.public_extensions, task_config.version, diff --git a/crates/daphne/src/roles/leader/mod.rs b/crates/daphne/src/roles/leader/mod.rs index c65c0a8f..6547c113 100644 --- a/crates/daphne/src/roles/leader/mod.rs +++ b/crates/daphne/src/roles/leader/mod.rs @@ -216,7 +216,6 @@ pub async fn handle_upload_req( .into()); } - // Check that the report was generated after the task's `not_before` time. if report.report_metadata.time < task_config.as_ref().not_before - task_config.as_ref().time_precision { @@ -226,6 +225,13 @@ pub async fn handle_upload_req( .into()); } + // Check that the report was generated after the task's `not_before` time. + println!( + "report_metadata.time - task_config.not_before: {}", + report.report_metadata.time as i128 - task_config.as_ref().not_before as i128 + ); + println!("report_metadata.time: {}", report.report_metadata.time); + if let Some(public_extensions) = &report.report_metadata.public_extensions { // We can be sure at this point that the ReportMetadata is well formed // because the decoding / error checking happens in the extractor. diff --git a/crates/daphne/src/roles/mod.rs b/crates/daphne/src/roles/mod.rs index 4a74078a..778fd168 100644 --- a/crates/daphne/src/roles/mod.rs +++ b/crates/daphne/src/roles/mod.rs @@ -1545,7 +1545,7 @@ mod test { } .to_config_with_taskprov( b"cool task".to_vec(), - t.now, + t.now + 1, t.leader.get_taskprov_config().unwrap(), ) .unwrap(); diff --git a/crates/daphne/src/testing/mod.rs b/crates/daphne/src/testing/mod.rs index dd6d3df6..d8cb2a4d 100644 --- a/crates/daphne/src/testing/mod.rs +++ b/crates/daphne/src/testing/mod.rs @@ -679,7 +679,7 @@ impl DapAggregator for InMemoryAggregator { ) -> Result { // Always opt-in with four shards. Ok(task_config.into_opted_in(&taskprov::OptInParam { - not_before: self.get_current_time(), + not_before: self.get_current_time() - 60, num_agg_span_shards: NonZeroUsize::new(4).unwrap(), })) } From dc6d896e587c50c5d5a3ee0f9fb7483a2f80806c Mon Sep 17 00:00:00 2001 From: Jonathan Hoyland Date: Wed, 22 Jan 2025 17:53:01 +0000 Subject: [PATCH 2/2] Begin debugging --- crates/daphne-server/src/roles/mod.rs | 1 + .../daphne-server/src/router/test_routes.rs | 5 +- crates/daphne-server/tests/e2e/e2e.rs | 12 +- crates/daphne-server/tests/e2e/test_runner.rs | 145 +++++++++++++++++- crates/daphne/src/lib.rs | 1 + crates/daphne/src/protocol/aggregator.rs | 1 + crates/daphne/src/protocol/report_init.rs | 8 +- crates/daphne/src/roles/leader/mod.rs | 2 + crates/daphne/src/testing/mod.rs | 1 + 9 files changed, 169 insertions(+), 7 deletions(-) diff --git a/crates/daphne-server/src/roles/mod.rs b/crates/daphne-server/src/roles/mod.rs index 2ae71d9e..27ca554e 100644 --- a/crates/daphne-server/src/roles/mod.rs +++ b/crates/daphne-server/src/roles/mod.rs @@ -296,6 +296,7 @@ mod test_utils { } }; + tracing::warn!("Add task: {}..{}", cmd.task_commencement, cmd.task_expiration); if self .kv() .put_if_not_exists_with_expiration::( diff --git a/crates/daphne-server/src/router/test_routes.rs b/crates/daphne-server/src/router/test_routes.rs index b09c42f1..eb1ef8b5 100644 --- a/crates/daphne-server/src/router/test_routes.rs +++ b/crates/daphne-server/src/router/test_routes.rs @@ -14,7 +14,7 @@ use daphne::{ constants::DapAggregatorRole, hpke::HpkeReceiverConfig, messages::{Base64Encode, TaskId}, - roles::{leader, DapLeader}, + roles::{leader, DapAggregator, DapLeader}, DapVersion, }; use daphne_service_utils::test_route_types::{InternalTestAddTask, InternalTestEndpointForTask}; @@ -129,6 +129,9 @@ async fn add_task( Path(version): Path, Json(cmd): Json, ) -> impl IntoResponse { + tracing::warn!("TaskID: {:?}", cmd.task_id); + tracing::warn!("task conf range: {}..{}", cmd.task_commencement, cmd.task_expiration); + tracing::warn!("Valid time range: {:?}", app.valid_report_time_range()); match app.internal_add_task(version, cmd).await { Ok(()) => ( StatusCode::OK, diff --git a/crates/daphne-server/tests/e2e/e2e.rs b/crates/daphne-server/tests/e2e/e2e.rs index 2fb0b8e8..6ea45f11 100644 --- a/crates/daphne-server/tests/e2e/e2e.rs +++ b/crates/daphne-server/tests/e2e/e2e.rs @@ -1402,9 +1402,6 @@ async fn leader_collect_taskprov_ok(version: DapVersion) { let t = TestRunner::default_with_version(version).await; let batch_interval = t.batch_interval(); - let client = t.http_client(); - let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap(); - let (task_config, task_id, taskprov_advertisement) = DapTaskParameters { version, min_batch_size: 10, @@ -1423,6 +1420,13 @@ async fn leader_collect_taskprov_ok(version: DapVersion) { ) .unwrap(); + t.setup_endpoints(&task_id, version).await; + + let client = &t.http_client(); + let hpke_config_list = t.get_hpke_configs_task_id(version, client, &task_id).await.unwrap(); + println!("Generated TaskID: {}",t.task_id); + + println!("TaskID: {:?}", task_id); println!("Now - not_before: {}", t.now - task_config.not_before); println!( "Now - batch_interval.start: {}", @@ -1559,7 +1563,7 @@ async fn leader_collect_taskprov_ok(version: DapVersion) { assert_eq!(resp.status(), 200); assert_eq!( resp.bytes().await.unwrap(), - collection.get_encoded_with_param(&t.version).unwrap() + collection.get_encoded_with_param(&version).unwrap() ); } diff --git a/crates/daphne-server/tests/e2e/test_runner.rs b/crates/daphne-server/tests/e2e/test_runner.rs index 6bacbc4d..cbf118b0 100644 --- a/crates/daphne-server/tests/e2e/test_runner.rs +++ b/crates/daphne-server/tests/e2e/test_runner.rs @@ -148,7 +148,7 @@ impl TestRunner { let leader_bearer_token = hex::encode(rng.gen::<[u8; 16]>()); let collector_bearer_token = hex::encode(rng.gen::<[u8; 16]>()); let t = Self { - global_config, + global_config, task_id, now, task_config, @@ -224,6 +224,8 @@ impl TestRunner { // // First, delete the data from the previous test. t.internal_delete_all(&t.batch_interval()).await.unwrap(); + println!("Task_config.not_before: {}", t.task_config.not_before); + println!("t.task_id: {:}", t.task_id); // Configure the Leader with the task. let leader_add_task_cmd = json!({ @@ -314,9 +316,127 @@ impl TestRunner { ); println!("############ starting test ############"); + t } + pub async fn setup_endpoints(&self, task_id: &TaskId, version: DapVersion){ + // Configure the endpoints. + // + // First, delete the data from the previous test. + self.internal_delete_all(&self.batch_interval()).await.unwrap(); + println!("Task_config.not_before: {}", self.task_config.not_before); + println!("self.task_id: {:}", self.task_id); + + let vdaf_verify_key_base64url = encode_base64url(self.task_config.vdaf_verify_key.as_ref()); + + let (batch_mode, max_batch_size) = match self.task_config.query { + DapBatchMode::TimeInterval => (1, None), + DapBatchMode::LeaderSelected { max_batch_size } => (2, Some(max_batch_size)), + }; + + let collector_hpke_config_base64url = + encode_base64url(self.collector_hpke_receiver.config.get_encoded().unwrap()); + + let vdaf = json!({ + "type": "Prio2", + "dimension": assert_matches!( + self.task_config.vdaf, + VdafConfig::Prio2{ dimension } => format!("{dimension}") + ), + }); + // Configure the Leader with the task. + let leader_add_task_cmd = json!({ + "task_id": task_id.to_base64url(), + "leader": self.leader_url, + "helper": self.helper_url, + "vdaf": vdaf.clone(), + "leader_authentication_token": self.leader_bearer_token.clone(), + "collector_authentication_token": self.collector_bearer_token.clone(), + "role": "leader", + "vdaf_verify_key": vdaf_verify_key_base64url, + "batch_mode": batch_mode, + "min_batch_size": self.task_config.min_batch_size, + "max_batch_size": max_batch_size, + "time_precision": self.task_config.time_precision, + "collector_hpke_config": collector_hpke_config_base64url.clone(), + "task_expiration": self.task_config.not_after, + "task_commencement": self.task_config.not_before, + }); + let add_task_path = format!("{}/internal/test/add_task", version.as_ref()); + let res: InternalTestCommandResult = self + .leader_post_internal(&add_task_path, &leader_add_task_cmd) + .await + .unwrap(); + assert_eq!( + res.status, "success", + "response status: {}, error: {:?}", + res.status, res.error + ); + + // Configure the Helper with the task. + let helper_add_task_cmd = json!({ + "task_id": task_id.to_base64url(), + "leader": self.leader_url, + "helper": self.helper_url, + "vdaf": vdaf.clone(), + "leader_authentication_token": self.leader_bearer_token.clone(), + "role": "helper", + "vdaf_verify_key": vdaf_verify_key_base64url, + "batch_mode": batch_mode, + "min_batch_size": self.task_config.min_batch_size, + "max_batch_size": max_batch_size, + "time_precision": self.task_config.time_precision, + "collector_hpke_config": collector_hpke_config_base64url.clone(), + "task_expiration": self.task_config.not_after, + "task_commencement": self.task_config.not_before, + }); + let res: InternalTestCommandResult = self + .helper_post_internal(&add_task_path, &helper_add_task_cmd) + .await + .unwrap(); + assert_eq!( + res.status, "success", + "response status: {}, error: {:?}", + res.status, res.error + ); + + let gen_config = || { + HpkeReceiverConfig::gen(0, HpkeKemId::X25519HkdfSha256) + .expect("failed to generate receiver config") + }; + let res: InternalTestCommandResult = self + .helper_post_internal( + &format!("{version}/internal/test/add_hpke_config"), + &gen_config(), + ) + .await + .unwrap(); + + assert_eq!( + res.status, "success", + "response status: {}, error {:?}", + res.status, res.error + ); + + let res: InternalTestCommandResult = self + .leader_post_internal( + &format!("{version}/internal/test/add_hpke_config"), + &gen_config(), + ) + .await + .unwrap(); + + assert_eq!( + res.status, "success", + "response status: {}, error {:?}", + res.status, res.error + ); + + println!("############ starting test ############"); + } + + pub fn http_client(&self) -> &reqwest::Client { &self.http_client } @@ -357,6 +477,29 @@ impl TestRunner { ]) } + pub async fn get_hpke_configs_task_id( + &self, + _version: DapVersion, + client: &reqwest::Client, + task_id: &TaskId + ) -> anyhow::Result<[HpkeConfig; 2]> { + let raw_leader_hpke_config = get_raw_hpke_config(client, task_id.as_ref(), &self.leader_url, "leader", &self.version).await?; + let raw_helper_hpke_config = get_raw_hpke_config(client, task_id.as_ref(), &self.helper_url, "helper", &self.version).await?; + + let mut leader_hpke_config_list = HpkeConfigList::get_decoded(&raw_leader_hpke_config)?; + let mut helper_hpke_config_list = HpkeConfigList::get_decoded(&raw_helper_hpke_config)?; + if leader_hpke_config_list.hpke_configs.len() != 1 + || helper_hpke_config_list.hpke_configs.len() != 1 + { + panic!("only a length 1 HpkeConfList is currently supported by the test suite") + } + Ok([ + leader_hpke_config_list.hpke_configs.pop().unwrap(), + helper_hpke_config_list.hpke_configs.pop().unwrap(), + ]) + } + + pub async fn leader_get_raw_hpke_config( &self, client: &reqwest::Client, diff --git a/crates/daphne/src/lib.rs b/crates/daphne/src/lib.rs index 7a87ebe0..38d1f630 100644 --- a/crates/daphne/src/lib.rs +++ b/crates/daphne/src/lib.rs @@ -586,6 +586,7 @@ impl DapTaskParameters { }; let task_id = taskprov_advertisement.compute_task_id(self.version); + println!("Computed TaskID: {}", task_id); // Compute the DAP task config. let task_config = taskprov::DapTaskConfigNeedsOptIn::try_from_taskprov_advertisement( diff --git a/crates/daphne/src/protocol/aggregator.rs b/crates/daphne/src/protocol/aggregator.rs index b169f366..ea4911a2 100644 --- a/crates/daphne/src/protocol/aggregator.rs +++ b/crates/daphne/src/protocol/aggregator.rs @@ -100,6 +100,7 @@ impl DapTaskConfig { where S: Iterator, { + tracing::warn!("DapTaskConfig times:{}..{}", self.not_before, self.not_after); let (report_count_hint, _upper_bound) = reports.size_hint(); let mut states = Vec::with_capacity(report_count_hint); diff --git a/crates/daphne/src/protocol/report_init.rs b/crates/daphne/src/protocol/report_init.rs index 8a21036c..2294cc66 100644 --- a/crates/daphne/src/protocol/report_init.rs +++ b/crates/daphne/src/protocol/report_init.rs @@ -63,11 +63,13 @@ impl InitializedReport<()> { report_share: ReportShare, agg_param: &DapAggregationParam, ) -> Result { + let tc: PartialDapTaskConfigForReportInit = task_config.into().clone(); + tracing::warn!("DapTaskConfig times:{}..{}", tc.not_before, tc.not_after); Self::initialize( decrypter, valid_report_range, task_id, - task_config, + tc.clone(), report_share, (), agg_param, @@ -123,6 +125,7 @@ impl<'s> From<&'s PartialDapTaskConfigForReportInit<'_>> for PartialDapTaskConfi } } +#[derive(Clone)] pub struct PartialDapTaskConfigForReportInit<'s> { pub not_before: messages::Time, pub not_after: messages::Time, @@ -156,7 +159,10 @@ impl

InitializedReport

{ }; } + tracing::info!("report timestamp: {}", report_share.report_metadata.time); tracing::info!("valid_report_range: {}..{}", valid_report_range.start, valid_report_range.end); + tracing::info!("task_config.range: {}..{}", task_config.not_before, task_config.not_after); + match report_share.report_metadata.time { t if t >= task_config.not_after => reject!(TaskExpired), t if t < task_config.not_before => {tracing::warn!("Reject TaskNotStarted"); reject!(TaskNotStarted)}, diff --git a/crates/daphne/src/roles/leader/mod.rs b/crates/daphne/src/roles/leader/mod.rs index 6547c113..38621c8c 100644 --- a/crates/daphne/src/roles/leader/mod.rs +++ b/crates/daphne/src/roles/leader/mod.rs @@ -543,6 +543,7 @@ pub async fn process( tracing::debug!("RUNNING read_work_stream"); + tracing::warn!("Aggregator valid time range: {:?}", aggregator.valid_report_time_range()); let mut agg_jobs = HashMap::new(); let mut pending_coll_jobs = Vec::new(); for work_item in aggregator.dequeue_work(num_items).await? { @@ -565,6 +566,7 @@ pub async fn process( return Ok(0); } + tracing::warn!("Retrieved time range: {}..{}", task_config.not_before, task_config.not_after); tracing::debug!( "RUNNING run_agg_job FOR TID {task_id} AND {part_batch_sel:?} AND {host}" ); diff --git a/crates/daphne/src/testing/mod.rs b/crates/daphne/src/testing/mod.rs index d8cb2a4d..c99ee1f4 100644 --- a/crates/daphne/src/testing/mod.rs +++ b/crates/daphne/src/testing/mod.rs @@ -689,6 +689,7 @@ impl DapAggregator for InMemoryAggregator { task_id: &TaskId, task_config: DapTaskConfig, ) -> Result<(), DapError> { + tracing::warn!("Put taskprov config"); let mut tasks = self.tasks.lock().expect("tasks: lock failed"); tasks.deref_mut().insert(*task_id, task_config); Ok(())