Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject reports earlier than task_config.not_before #755

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/dapf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,10 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
|| 604_800u64,
"task should expire in",
)?,
task_commencement: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

print_json(&internal_task);
Expand Down
4 changes: 2 additions & 2 deletions crates/daphne-server/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ mod test_utils {
fatal_error,
hpke::{HpkeConfig, HpkeReceiverConfig},
messages::decode_base64url_vec,
roles::DapAggregator,
vdaf::{Prio3Config, VdafConfig},
DapBatchMode, DapError, DapTaskConfig, DapVersion,
};
Expand Down Expand Up @@ -297,6 +296,7 @@ mod test_utils {
}
};

tracing::warn!("Add task: {}..{}", cmd.task_commencement, cmd.task_expiration);
if self
.kv()
.put_if_not_exists_with_expiration::<kv::prefix::TaskConfig>(
Expand All @@ -306,7 +306,7 @@ mod test_utils {
leader_url: cmd.leader,
helper_url: cmd.helper,
time_precision: cmd.time_precision,
not_before: self.get_current_time(),
not_before: cmd.task_commencement,
not_after: cmd.task_expiration,
min_batch_size: cmd.min_batch_size,
query,
Expand Down
5 changes: 4 additions & 1 deletion crates/daphne-server/src/router/test_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use daphne::{
constants::DapAggregatorRole,
hpke::HpkeReceiverConfig,
messages::{Base64Encode, TaskId},
roles::{leader, DapLeader},
roles::{leader, DapAggregator, DapLeader},
DapVersion,
};
use daphne_service_utils::test_route_types::{InternalTestAddTask, InternalTestEndpointForTask};
Expand Down Expand Up @@ -129,6 +129,9 @@ async fn add_task(
Path(version): Path<DapVersion>,
Json(cmd): Json<InternalTestAddTask>,
) -> impl IntoResponse {
tracing::warn!("TaskID: {:?}", cmd.task_id);
tracing::warn!("task conf range: {}..{}", cmd.task_commencement, cmd.task_expiration);
tracing::warn!("Valid time range: {:?}", app.valid_report_time_range());
match app.internal_add_task(version, cmd).await {
Ok(()) => (
StatusCode::OK,
Expand Down
20 changes: 16 additions & 4 deletions crates/daphne-server/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1402,9 +1402,6 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
let t = TestRunner::default_with_version(version).await;
let batch_interval = t.batch_interval();

let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
Expand All @@ -1423,6 +1420,19 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
)
.unwrap();

t.setup_endpoints(&task_id, version).await;

let client = &t.http_client();
let hpke_config_list = t.get_hpke_configs_task_id(version, client, &task_id).await.unwrap();
println!("Generated TaskID: {}",t.task_id);

println!("TaskID: {:?}", task_id);
println!("Now - not_before: {}", t.now - task_config.not_before);
println!(
"Now - batch_interval.start: {}",
t.now - batch_interval.start
);
println!("t.now: {}", t.now);
let path = TestRunner::upload_path_for_task(&task_id);
let method = match version {
DapVersion::Draft09 => &Method::PUT,
Expand All @@ -1433,7 +1443,9 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
let mut rng = thread_rng();
for _ in 0..t.task_config.min_batch_size {
let extensions = vec![Extension::Taskprov];
println!("Report interval: {:?}", TestRunner::report_interval(&batch_interval));
let now = rng.gen_range(TestRunner::report_interval(&batch_interval));
println!("\tnow: {}", now);
t.leader_request_expect_ok(
client,
&path,
Expand Down Expand Up @@ -1551,7 +1563,7 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
assert_eq!(resp.status(), 200);
assert_eq!(
resp.bytes().await.unwrap(),
collection.get_encoded_with_param(&t.version).unwrap()
collection.get_encoded_with_param(&version).unwrap()
);
}

Expand Down
149 changes: 147 additions & 2 deletions crates/daphne-server/tests/e2e/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl TestRunner {
version,
leader_url: leader_url.clone(),
helper_url: helper_url.clone(),
not_before: now,
not_before: now - (now % TIME_PRECISION) - 1,
not_after: now + 604_800, // one week from now
time_precision: TIME_PRECISION,
min_batch_size: MIN_BATCH_SIZE,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl TestRunner {
let leader_bearer_token = hex::encode(rng.gen::<[u8; 16]>());
let collector_bearer_token = hex::encode(rng.gen::<[u8; 16]>());
let t = Self {
global_config,
global_config,
task_id,
now,
task_config,
Expand Down Expand Up @@ -224,6 +224,8 @@ impl TestRunner {
//
// First, delete the data from the previous test.
t.internal_delete_all(&t.batch_interval()).await.unwrap();
println!("Task_config.not_before: {}", t.task_config.not_before);
println!("t.task_id: {:}", t.task_id);

// Configure the Leader with the task.
let leader_add_task_cmd = json!({
Expand All @@ -241,6 +243,7 @@ impl TestRunner {
"time_precision": t.task_config.time_precision,
"collector_hpke_config": collector_hpke_config_base64url.clone(),
"task_expiration": t.task_config.not_after,
"task_commencement": t.task_config.not_before,
});
let add_task_path = format!("{}/internal/test/add_task", version.as_ref());
let res: InternalTestCommandResult = t
Expand Down Expand Up @@ -268,6 +271,7 @@ impl TestRunner {
"time_precision": t.task_config.time_precision,
"collector_hpke_config": collector_hpke_config_base64url.clone(),
"task_expiration": t.task_config.not_after,
"task_commencement": t.task_config.not_before,
});
let res: InternalTestCommandResult = t
.helper_post_internal(&add_task_path, &helper_add_task_cmd)
Expand Down Expand Up @@ -312,9 +316,127 @@ impl TestRunner {
);

println!("############ starting test ############");

t
}

pub async fn setup_endpoints(&self, task_id: &TaskId, version: DapVersion){
// Configure the endpoints.
//
// First, delete the data from the previous test.
self.internal_delete_all(&self.batch_interval()).await.unwrap();
println!("Task_config.not_before: {}", self.task_config.not_before);
println!("self.task_id: {:}", self.task_id);

let vdaf_verify_key_base64url = encode_base64url(self.task_config.vdaf_verify_key.as_ref());

let (batch_mode, max_batch_size) = match self.task_config.query {
DapBatchMode::TimeInterval => (1, None),
DapBatchMode::LeaderSelected { max_batch_size } => (2, Some(max_batch_size)),
};

let collector_hpke_config_base64url =
encode_base64url(self.collector_hpke_receiver.config.get_encoded().unwrap());

let vdaf = json!({
"type": "Prio2",
"dimension": assert_matches!(
self.task_config.vdaf,
VdafConfig::Prio2{ dimension } => format!("{dimension}")
),
});
// Configure the Leader with the task.
let leader_add_task_cmd = json!({
"task_id": task_id.to_base64url(),
"leader": self.leader_url,
"helper": self.helper_url,
"vdaf": vdaf.clone(),
"leader_authentication_token": self.leader_bearer_token.clone(),
"collector_authentication_token": self.collector_bearer_token.clone(),
"role": "leader",
"vdaf_verify_key": vdaf_verify_key_base64url,
"batch_mode": batch_mode,
"min_batch_size": self.task_config.min_batch_size,
"max_batch_size": max_batch_size,
"time_precision": self.task_config.time_precision,
"collector_hpke_config": collector_hpke_config_base64url.clone(),
"task_expiration": self.task_config.not_after,
"task_commencement": self.task_config.not_before,
});
let add_task_path = format!("{}/internal/test/add_task", version.as_ref());
let res: InternalTestCommandResult = self
.leader_post_internal(&add_task_path, &leader_add_task_cmd)
.await
.unwrap();
assert_eq!(
res.status, "success",
"response status: {}, error: {:?}",
res.status, res.error
);

// Configure the Helper with the task.
let helper_add_task_cmd = json!({
"task_id": task_id.to_base64url(),
"leader": self.leader_url,
"helper": self.helper_url,
"vdaf": vdaf.clone(),
"leader_authentication_token": self.leader_bearer_token.clone(),
"role": "helper",
"vdaf_verify_key": vdaf_verify_key_base64url,
"batch_mode": batch_mode,
"min_batch_size": self.task_config.min_batch_size,
"max_batch_size": max_batch_size,
"time_precision": self.task_config.time_precision,
"collector_hpke_config": collector_hpke_config_base64url.clone(),
"task_expiration": self.task_config.not_after,
"task_commencement": self.task_config.not_before,
});
let res: InternalTestCommandResult = self
.helper_post_internal(&add_task_path, &helper_add_task_cmd)
.await
.unwrap();
assert_eq!(
res.status, "success",
"response status: {}, error: {:?}",
res.status, res.error
);

let gen_config = || {
HpkeReceiverConfig::gen(0, HpkeKemId::X25519HkdfSha256)
.expect("failed to generate receiver config")
};
let res: InternalTestCommandResult = self
.helper_post_internal(
&format!("{version}/internal/test/add_hpke_config"),
&gen_config(),
)
.await
.unwrap();

assert_eq!(
res.status, "success",
"response status: {}, error {:?}",
res.status, res.error
);

let res: InternalTestCommandResult = self
.leader_post_internal(
&format!("{version}/internal/test/add_hpke_config"),
&gen_config(),
)
.await
.unwrap();

assert_eq!(
res.status, "success",
"response status: {}, error {:?}",
res.status, res.error
);

println!("############ starting test ############");
}


pub fn http_client(&self) -> &reqwest::Client {
&self.http_client
}
Expand Down Expand Up @@ -355,6 +477,29 @@ impl TestRunner {
])
}

pub async fn get_hpke_configs_task_id(
&self,
_version: DapVersion,
client: &reqwest::Client,
task_id: &TaskId
) -> anyhow::Result<[HpkeConfig; 2]> {
let raw_leader_hpke_config = get_raw_hpke_config(client, task_id.as_ref(), &self.leader_url, "leader", &self.version).await?;
let raw_helper_hpke_config = get_raw_hpke_config(client, task_id.as_ref(), &self.helper_url, "helper", &self.version).await?;

let mut leader_hpke_config_list = HpkeConfigList::get_decoded(&raw_leader_hpke_config)?;
let mut helper_hpke_config_list = HpkeConfigList::get_decoded(&raw_helper_hpke_config)?;
if leader_hpke_config_list.hpke_configs.len() != 1
|| helper_hpke_config_list.hpke_configs.len() != 1
{
panic!("only a length 1 HpkeConfList is currently supported by the test suite")
}
Ok([
leader_hpke_config_list.hpke_configs.pop().unwrap(),
helper_hpke_config_list.hpke_configs.pop().unwrap(),
])
}


pub async fn leader_get_raw_hpke_config(
&self,
client: &reqwest::Client,
Expand Down
1 change: 1 addition & 0 deletions crates/daphne-service-utils/src/test_route_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@ pub struct InternalTestAddTask {
pub max_batch_size: Option<NonZeroU32>,
pub time_precision: Duration,
pub collector_hpke_config: String, // base64url
pub task_commencement: Time,
pub task_expiration: Time,
}
3 changes: 2 additions & 1 deletion crates/daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ impl DapTaskParameters {
};

let task_id = taskprov_advertisement.compute_task_id(self.version);
println!("Computed TaskID: {}", task_id);

// Compute the DAP task config.
let task_config = taskprov::DapTaskConfigNeedsOptIn::try_from_taskprov_advertisement(
Expand All @@ -596,7 +597,7 @@ impl DapTaskParameters {
)
.unwrap()
.into_opted_in(&taskprov::OptInParam {
not_before: now,
not_before: now - (now % self.time_precision) - 1,
num_agg_span_shards: self.num_agg_span_shards,
});

Expand Down
1 change: 1 addition & 0 deletions crates/daphne/src/protocol/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl DapTaskConfig {
where
S: Iterator<Item = Report>,
{
tracing::warn!("DapTaskConfig times:{}..{}", self.not_before, self.not_after);
let (report_count_hint, _upper_bound) = reports.size_hint();

let mut states = Vec::with_capacity(report_count_hint);
Expand Down
Loading
Loading