From bb45c7436f583a7ed2e408beffbf809b70142848 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Wed, 18 Dec 2024 17:12:48 +0200 Subject: [PATCH] feat: Create `PushId` type (#2286) Fixes #785 --- neqo-http3/src/client_events.rs | 22 +- neqo-http3/src/connection_client.rs | 476 +++++++++++++++++--------- neqo-http3/src/frames/hframe.rs | 18 +- neqo-http3/src/frames/tests/hframe.rs | 8 +- neqo-http3/src/frames/tests/reader.rs | 16 +- neqo-http3/src/lib.rs | 2 + neqo-http3/src/push_controller.rs | 50 +-- neqo-http3/src/push_id.rs | 55 +++ neqo-http3/src/recv_message.rs | 6 +- neqo-http3/src/stream_type_reader.rs | 12 +- 10 files changed, 438 insertions(+), 227 deletions(-) create mode 100644 neqo-http3/src/push_id.rs diff --git a/neqo-http3/src/client_events.rs b/neqo-http3/src/client_events.rs index 61aba8f9f1..b55f31c55d 100644 --- a/neqo-http3/src/client_events.rs +++ b/neqo-http3/src/client_events.rs @@ -16,7 +16,7 @@ use crate::{ connection::Http3State, features::extended_connect::{ExtendedConnectEvents, ExtendedConnectType, SessionCloseReason}, settings::HSettingType, - CloseType, Http3StreamInfo, HttpRecvStreamEvents, RecvStreamEvents, SendStreamEvents, + CloseType, Http3StreamInfo, HttpRecvStreamEvents, PushId, RecvStreamEvents, SendStreamEvents, }; #[derive(Debug, PartialEq, Eq, Clone)] @@ -68,24 +68,24 @@ pub enum Http3ClientEvent { }, /// A new push promise. PushPromise { - push_id: u64, + push_id: PushId, request_stream_id: StreamId, headers: Vec
, }, /// A push response headers are ready. PushHeaderReady { - push_id: u64, + push_id: PushId, headers: Vec
, interim: bool, fin: bool, }, /// New bytes are available on a push stream for reading. - PushDataReadable { push_id: u64 }, + PushDataReadable { push_id: PushId }, /// A push has been canceled. - PushCanceled { push_id: u64 }, + PushCanceled { push_id: PushId }, /// A push stream was been reset due to a `HttpGeneralProtocol` error. /// Most common case are malformed response headers. - PushReset { push_id: u64, error: AppError }, + PushReset { push_id: PushId, error: AppError }, /// New stream can be created RequestsCreatable, /// Cert authentication needed @@ -240,7 +240,7 @@ impl ExtendedConnectEvents for Http3ClientEvents { } impl Http3ClientEvents { - pub fn push_promise(&self, push_id: u64, request_stream_id: StreamId, headers: Vec
) { + pub fn push_promise(&self, push_id: PushId, request_stream_id: StreamId, headers: Vec
) { self.insert(Http3ClientEvent::PushPromise { push_id, request_stream_id, @@ -248,12 +248,12 @@ impl Http3ClientEvents { }); } - pub fn push_canceled(&self, push_id: u64) { + pub fn push_canceled(&self, push_id: PushId) { self.remove_events_for_push_id(push_id); self.insert(Http3ClientEvent::PushCanceled { push_id }); } - pub fn push_reset(&self, push_id: u64, error: AppError) { + pub fn push_reset(&self, push_id: PushId, error: AppError) { self.remove_events_for_push_id(push_id); self.insert(Http3ClientEvent::PushReset { push_id, error }); } @@ -336,7 +336,7 @@ impl Http3ClientEvents { }); } - pub fn has_push(&self, push_id: u64) -> bool { + pub fn has_push(&self, push_id: PushId) -> bool { for iter in &*self.events.borrow() { if matches!(iter, Http3ClientEvent::PushPromise{push_id:x, ..} if *x == push_id) { return true; @@ -345,7 +345,7 @@ impl Http3ClientEvents { false } - pub fn remove_events_for_push_id(&self, push_id: u64) { + pub fn remove_events_for_push_id(&self, push_id: PushId) { self.remove(|evt| { matches!(evt, Http3ClientEvent::PushPromise{ push_id: x, .. } diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index dfb7ef59b5..f23abee831 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -33,7 +33,7 @@ use crate::{ recv_message::{RecvMessage, RecvMessageInfo}, request_target::AsRequestTarget, settings::HSettings, - Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler, + Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler, PushId, ReceiveOutput, Res, }; @@ -667,7 +667,7 @@ impl Http3Client { /// # Errors /// /// `InvalidStreamId` if the stream does not exist. - pub fn cancel_push(&mut self, push_id: u64) -> Res<()> { + pub fn cancel_push(&mut self, push_id: PushId) -> Res<()> { self.push_handler .borrow_mut() .cancel(push_id, &mut self.conn, &mut self.base_handler) @@ -683,7 +683,7 @@ impl Http3Client { pub fn push_read_data( &mut self, now: Instant, - push_id: u64, + push_id: PushId, buf: &mut [u8], ) -> Res<(usize, bool)> { let stream_id = self @@ -1131,7 +1131,7 @@ impl Http3Client { } } - fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: u64) -> Res<()> { + fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: PushId) -> Res<()> { if !self.push_handler.borrow().can_receive_push() { return Err(Error::HttpId); } @@ -1307,7 +1307,7 @@ mod tests { frames::{HFrame, H3_FRAME_TYPE_SETTINGS, H3_RESERVED_FRAME_TYPES}, qpack_encoder_receiver::EncoderRecvStream, settings::{HSetting, HSettingType, H3_RESERVED_SETTINGS}, - Http3Server, Priority, RecvStream, + Http3Server, Priority, PushId, RecvStream, }; fn assert_closed(client: &Http3Client, expected: &Error) { @@ -1914,7 +1914,7 @@ mod tests { } // Send a push promise with push_id and request_stream_id. - fn send_push_promise(conn: &mut Connection, stream_id: StreamId, push_id: u64) { + fn send_push_promise(conn: &mut Connection, stream_id: StreamId, push_id: PushId) { let frame = HFrame::PushPromise { push_id, header_block: PUSH_PROMISE_DATA.to_vec(), @@ -1927,7 +1927,7 @@ mod tests { fn send_push_data_and_exchange_packets( client: &mut Http3Client, server: &mut TestServer, - push_id: u8, + push_id: PushId, close_push_stream: bool, ) -> StreamId { let push_stream_id = send_push_data(&mut server.conn, push_id, close_push_stream); @@ -1943,7 +1943,7 @@ mod tests { client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, - push_id: u64, + push_id: PushId, ) { send_push_promise(&mut server.conn, stream_id, push_id); @@ -1955,7 +1955,7 @@ mod tests { fn send_cancel_push_and_exchange_packets( client: &mut Http3Client, server: &mut TestServer, - push_id: u64, + push_id: PushId, ) { let frame = HFrame::CancelPush { push_id }; let mut d = Encoder::default(); @@ -1997,13 +1997,15 @@ mod tests { fn send_data_on_push( conn: &mut Connection, push_stream_id: StreamId, - push_id: u8, + push_id: PushId, data: impl AsRef<[u8]>, close_push_stream: bool, ) { // send data _ = conn.stream_send(push_stream_id, PUSH_STREAM_TYPE).unwrap(); - _ = conn.stream_send(push_stream_id, &[push_id]).unwrap(); + _ = conn + .stream_send(push_stream_id, &[u8::try_from(u64::from(push_id)).unwrap()]) + .unwrap(); _ = conn.stream_send(push_stream_id, data.as_ref()).unwrap(); if close_push_stream { conn.stream_close_send(push_stream_id).unwrap(); @@ -2015,7 +2017,7 @@ mod tests { // 2) push_id // 3) PUSH_DATA that contains encoded headers and a data frame. // This function can only handle small push_id numbers that fit in a varint of length 1 byte. - fn send_push_data(conn: &mut Connection, push_id: u8, close_push_stream: bool) -> StreamId { + fn send_push_data(conn: &mut Connection, push_id: PushId, close_push_stream: bool) -> StreamId { send_push_with_data(conn, push_id, PUSH_DATA, close_push_stream) } @@ -2026,7 +2028,7 @@ mod tests { // This function can only handle small push_id numbers that fit in a varint of length 1 byte. fn send_push_with_data( conn: &mut Connection, - push_id: u8, + push_id: PushId, data: &[u8], close_push_stream: bool, ) -> StreamId { @@ -2038,7 +2040,7 @@ mod tests { } struct PushPromiseInfo { - pub push_id: u64, + pub push_id: PushId, pub ref_stream_id: StreamId, } @@ -2051,7 +2053,7 @@ mod tests { fn read_response_and_push_events( client: &mut Http3Client, push_promises: &[PushPromiseInfo], - push_streams: &[u64], + push_streams: &[PushId], response_stream_id: StreamId, ) { let mut num_push_promises = 0; @@ -2302,7 +2304,7 @@ mod tests { fn test_wrong_frame_on_push_stream(v: &[u8]) { let (mut client, mut server, request_stream_id) = connect_and_send_request(false); - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); // Create a push stream let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); @@ -5034,10 +5036,10 @@ mod tests { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); // create a push stream. - _ = send_push_data(&mut server.conn, 0, true); + _ = send_push_data(&mut server.conn, PushId::new(0), true); server_send_response_and_exchange_packet( &mut client, @@ -5050,10 +5052,10 @@ mod tests { read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }], - &[0], + &[PushId::new(0)], request_stream_id, ); @@ -5061,7 +5063,10 @@ mod tests { // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); } /// We can't keep the connection alive on the basis of a push promise, @@ -5074,7 +5079,7 @@ mod tests { let idle_timeout = ConnectionParameters::default().get_idle_timeout(); // Promise a push and deliver, but don't close the stream. - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); server_send_response_and_exchange_packet( &mut client, &mut server, @@ -5085,7 +5090,7 @@ mod tests { read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }], &[], // No push streams yet. @@ -5097,12 +5102,14 @@ mod tests { assert_eq!(client.process_output(now()).callback(), idle_timeout); // Reading push data will stop the client from being idle. - _ = send_push_data(&mut server.conn, 0, false); + _ = send_push_data(&mut server.conn, PushId::new(0), false); let out = server.conn.process_output(now()); client.process_input(out.dgram().unwrap(), now()); let mut buf = [0; 16]; - let (read, fin) = client.push_read_data(now(), 0, &mut buf).unwrap(); + let (read, fin) = client + .push_read_data(now(), PushId::new(0), &mut buf) + .unwrap(); assert!(read < buf.len()); assert!(!fin); @@ -5116,14 +5123,14 @@ mod tests { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. - send_push_promise(&mut server.conn, request_stream_id, 0); - send_push_promise(&mut server.conn, request_stream_id, 1); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(1)); // create a push stream. - _ = send_push_data(&mut server.conn, 0, true); + _ = send_push_data(&mut server.conn, PushId::new(0), true); // create a second push stream. - _ = send_push_data(&mut server.conn, 1, true); + _ = send_push_data(&mut server.conn, PushId::new(1), true); server_send_response_and_exchange_packet( &mut client, @@ -5137,15 +5144,15 @@ mod tests { &mut client, &[ PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 1, + push_id: PushId::new(1), ref_stream_id: request_stream_id, }, ], - &[0, 1], + &[PushId::new(0), PushId::new(1)], request_stream_id, ); @@ -5153,8 +5160,14 @@ mod tests { // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); - assert_eq!(client.cancel_push(1), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); + assert_eq!( + client.cancel_push(PushId::new(1)), + Err(Error::InvalidStreamId) + ); } #[test] @@ -5169,10 +5182,10 @@ mod tests { .unwrap(); // Send a push promise. - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); // create a push stream. - _ = send_push_data(&mut server.conn, 0, true); + _ = send_push_data(&mut server.conn, PushId::new(0), true); // Send response data server_send_response_and_exchange_packet( @@ -5186,10 +5199,10 @@ mod tests { read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }], - &[0], + &[PushId::new(0)], request_stream_id, ); @@ -5208,17 +5221,17 @@ mod tests { .unwrap(); // Send a push promise. - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); // create a push stream. - send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), true); read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }], - &[0], + &[PushId::new(0)], request_stream_id, ); @@ -5263,13 +5276,18 @@ mod tests { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // create a push stream. - send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), true); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Now send push_promise - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); server_send_response_and_exchange_packet( &mut client, @@ -5282,10 +5300,10 @@ mod tests { read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }], - &[0], + &[PushId::new(0)], request_stream_id, ); @@ -5300,11 +5318,21 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(5), + ); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(3), + ); // Start a push stream with push_id 3. - send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(3), true); assert_eq!(client.state(), Http3State::Connected); @@ -5312,15 +5340,15 @@ mod tests { &mut client, &[ PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 3, + push_id: PushId::new(3), ref_stream_id: request_stream_id, }, ], - &[3], + &[PushId::new(3)], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); @@ -5334,24 +5362,34 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(5), + ); - send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(3), true); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(3), + ); read_response_and_push_events( &mut client, &[ PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 3, + push_id: PushId::new(3), ref_stream_id: request_stream_id, }, ], - &[3], + &[PushId::new(3)], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); @@ -5366,31 +5404,41 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); - send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(5), + ); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(5), true); assert_eq!(client.state(), Http3State::Connected); // Read push stream with push_id 5 to make it change to closed state. read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id, }], - &[5], + &[PushId::new(5)], request_stream_id, ); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); - send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(3), + ); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(3), true); read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 3, + push_id: PushId::new(3), ref_stream_id: request_stream_id, }], - &[3], + &[PushId::new(3)], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); @@ -5402,7 +5450,12 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(5), + ); // make a second request. let request_stream_id_2 = make_request(&mut client, false, &[]); @@ -5411,17 +5464,22 @@ mod tests { let out = client.process_output(now()); mem::drop(server.conn.process(out.dgram(), now())); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id_2, + PushId::new(5), + ); read_response_and_push_events( &mut client, &[ PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id_2, }, ], @@ -5437,8 +5495,13 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); - send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(5), + ); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(5), true); // make a second request. let request_stream_id_2 = make_request(&mut client, false, &[]); @@ -5447,21 +5510,26 @@ mod tests { let out = client.process_output(now()); mem::drop(server.conn.process(out.dgram(), now())); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id_2, + PushId::new(5), + ); read_response_and_push_events( &mut client, &[ PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id_2, }, ], - &[5], + &[PushId::new(5)], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); @@ -5474,17 +5542,22 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(5), + ); // Start a push stream with push_id 5. - send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(5), true); read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 5, + push_id: PushId::new(5), ref_stream_id: request_stream_id, }], - &[5], + &[PushId::new(5)], request_stream_id, ); @@ -5495,7 +5568,12 @@ mod tests { let out = client.process_output(now()); mem::drop(server.conn.process(out.dgram(), now())); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id_2, + PushId::new(5), + ); // Check that we do not have a Http3ClientEvent::PushPromise. let push_event = |e| matches!(e, Http3ClientEvent::PushPromise { .. }); @@ -5509,7 +5587,12 @@ mod tests { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. max_push_id is set to 5, to trigger an error we send push_id=6. - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 6); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(6), + ); assert_closed(&client, &Error::HttpId); } @@ -5521,7 +5604,7 @@ mod tests { let (mut client, mut server) = connect(); // Send a push stream. max_push_id is set to 5, to trigger an error we send push_id=6. - send_push_data_and_exchange_packets(&mut client, &mut server, 6, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(6), true); assert_closed(&client, &Error::HttpId); } @@ -5533,7 +5616,7 @@ mod tests { let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); // Send CANCEL_PUSH for push_id 6. - send_cancel_push_and_exchange_packets(&mut client, &mut server, 6); + send_cancel_push_and_exchange_packets(&mut client, &mut server, PushId::new(6)); assert_closed(&client, &Error::HttpId); } @@ -5544,7 +5627,7 @@ mod tests { // Connect and send a request let (mut client, _, _) = connect_and_send_request(true); - assert_eq!(client.cancel_push(6), Err(Error::HttpId)); + assert_eq!(client.cancel_push(PushId::new(6)), Err(Error::HttpId)); assert_eq!(client.state(), Http3State::Connected); } @@ -5556,32 +5639,32 @@ mod tests { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send 3 push promises. - send_push_promise(&mut server.conn, request_stream_id, 0); - send_push_promise(&mut server.conn, request_stream_id, 1); - send_push_promise(&mut server.conn, request_stream_id, 2); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(1)); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(2)); // create 3 push streams. - send_push_data(&mut server.conn, 0, true); - send_push_data(&mut server.conn, 1, true); - send_push_data_and_exchange_packets(&mut client, &mut server, 2, true); + send_push_data(&mut server.conn, PushId::new(0), true); + send_push_data(&mut server.conn, PushId::new(1), true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(2), true); read_response_and_push_events( &mut client, &[ PushPromiseInfo { - push_id: 0, + push_id: PushId::new(0), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 1, + push_id: PushId::new(1), ref_stream_id: request_stream_id, }, PushPromiseInfo { - push_id: 2, + push_id: PushId::new(2), ref_stream_id: request_stream_id, }, ], - &[0, 1, 2], + &[PushId::new(0), PushId::new(1), PushId::new(2)], request_stream_id, ); @@ -5600,8 +5683,8 @@ mod tests { assert_eq!(&buf[..3], MAX_PUSH_ID_FRAME); // Check that we can send push_id=8 now - send_push_promise(&mut server.conn, request_stream_id, 8); - send_push_data(&mut server.conn, 8, true); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(8)); + send_push_data(&mut server.conn, PushId::new(8), true); let out = server.conn.process_output(now()); let out = client.process(out.dgram(), now()); @@ -5612,10 +5695,10 @@ mod tests { read_response_and_push_events( &mut client, &[PushPromiseInfo { - push_id: 8, + push_id: PushId::new(8), ref_stream_id: request_stream_id, }], - &[8], + &[PushId::new(8)], request_stream_id, ); @@ -5629,10 +5712,10 @@ mod tests { let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); // Start a push stream with push_id 0. - send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), true); // Send it again - send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), true); assert_closed(&client, &Error::HttpId); } @@ -5643,11 +5726,11 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise(&mut server.conn, request_stream_id, 0); - send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), true); // Now the push_stream is in the PushState::Active state - send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), true); assert_closed(&client, &Error::HttpId); } @@ -5673,19 +5756,22 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); + send_cancel_push_and_exchange_packets(&mut client, &mut server, PushId::new(0)); - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); // Start a push stream with push_id 0. let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); // Check that the push has been canceled by the client. assert_stop_sending_event( @@ -5704,18 +5790,21 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); - send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); + send_cancel_push_and_exchange_packets(&mut client, &mut server, PushId::new(0)); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); // Check that the push has been canceled by the client. assert_stop_sending_event( @@ -5735,16 +5824,19 @@ mod tests { // Start a push stream with push_id 0. let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); - send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); + send_cancel_push_and_exchange_packets(&mut client, &mut server, PushId::new(0)); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); // Check that the push has been canceled by the client. assert_stop_sending_event( @@ -5763,10 +5855,10 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, PushId::new(0)); // Start a push stream with push_id 0. let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); server .conn @@ -5780,7 +5872,10 @@ mod tests { // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); assert_eq!(client.state(), Http3State::Connected); } @@ -5793,7 +5888,7 @@ mod tests { // Start a push stream with push_id 0. let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); server .conn @@ -5802,14 +5897,22 @@ mod tests { let out = server.conn.process_output(now()); client.process(out.dgram(), now()); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); assert_eq!(client.state(), Http3State::Connected); } @@ -5820,16 +5923,24 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); - assert!(client.cancel_push(0).is_ok()); + assert!(client.cancel_push(PushId::new(0)).is_ok()); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); assert_eq!(client.state(), Http3State::Connected); } @@ -5841,11 +5952,16 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); - assert!(client.cancel_push(0).is_ok()); + assert!(client.cancel_push(PushId::new(0)).is_ok()); let out = client.process_output(now()); mem::drop(server.conn.process(out.dgram(), now())); @@ -5854,7 +5970,10 @@ mod tests { // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); // Check that the push has been canceled by the client. assert_stop_sending_event( @@ -5872,22 +5991,35 @@ mod tests { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); let push_stream_id = - send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + send_push_data_and_exchange_packets(&mut client, &mut server, PushId::new(0), false); - assert!(client.cancel_push(0).is_ok()); + assert!(client.cancel_push(PushId::new(0)).is_ok()); let out = client.process_output(now()); mem::drop(server.conn.process(out.dgram(), now())); - send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + send_push_promise_and_exchange_packets( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return // InvalidStreamId. - assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!( + client.cancel_push(PushId::new(0)), + Err(Error::InvalidStreamId) + ); // Check that the push has been canceled by the client. assert_stop_sending_event( @@ -5931,7 +6063,7 @@ mod tests { client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, - push_id: u64, + push_id: PushId, ) -> Option { send_push_promise_using_encoder_with_custom_headers( client, @@ -5946,7 +6078,7 @@ mod tests { client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, - push_id: u64, + push_id: PushId, additional_header: Header, ) -> Option { let mut headers = vec![ @@ -5986,8 +6118,12 @@ mod tests { setup_server_side_encoder(&mut client, &mut server); - let encoder_inst_pkt = - send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + let encoder_inst_pkt = send_push_promise_using_encoder( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); // PushPromise is blocked watching for encoder instructions. assert!(!check_push_events(&mut client)); @@ -6015,8 +6151,12 @@ mod tests { false, ); - let encoder_inst_pkt = - send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + let encoder_inst_pkt = send_push_promise_using_encoder( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); // PushPromise is blocked watching for encoder instructions. assert!(!check_push_events(&mut client)); @@ -6057,8 +6197,12 @@ mod tests { setup_server_side_encoder(&mut client, &mut server); - let encoder_inst_pkt = - send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + let encoder_inst_pkt = send_push_promise_using_encoder( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); // PushPromise is blocked watching for encoder instructions. assert!(!check_push_events(&mut client)); @@ -6099,8 +6243,12 @@ mod tests { let _out = client.process(encoder_inst_pkt1, now()); // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client. - let encoder_inst_pkt2 = - send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + let encoder_inst_pkt2 = send_push_promise_using_encoder( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ); // PushPromise is blocked watching for encoder instructions. assert!(!check_push_events(&mut client)); @@ -6159,7 +6307,7 @@ mod tests { &mut client, &mut server, request_stream_id, - 0, + PushId::new(0), Header::new("myn1", "myv1"), ); @@ -6170,7 +6318,7 @@ mod tests { &mut client, &mut server, request_stream_id, - 1, + PushId::new(1), Header::new("myn2", "myv2"), ); @@ -6224,8 +6372,13 @@ mod tests { setup_server_side_encoder(&mut client, &mut server); mem::drop( - send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0) - .unwrap(), + send_push_promise_using_encoder( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ) + .unwrap(), ); server_send_response_and_exchange_packet( @@ -6471,8 +6624,13 @@ mod tests { // Send PushPromise that will be blocked waiting for decoder instructions. mem::drop( - send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0) - .unwrap(), + send_push_promise_using_encoder( + &mut client, + &mut server, + request_stream_id, + PushId::new(0), + ) + .unwrap(), ); // Send response @@ -6731,7 +6889,7 @@ mod tests { // Client: receive a push stream #[test] fn push_single_with_1xx() { - const FIRST_PUSH_ID: u64 = 0; + const FIRST_PUSH_ID: PushId = PushId::new(0); // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); @@ -6752,13 +6910,7 @@ mod tests { server.encode_headers(push_stream_id, headers200, &mut d); // create a push stream. - send_data_on_push( - &mut server.conn, - push_stream_id, - u8::try_from(FIRST_PUSH_ID).unwrap(), - &d, - true, - ); + send_data_on_push(&mut server.conn, push_stream_id, FIRST_PUSH_ID, &d, true); server_send_response_and_exchange_packet( &mut client, @@ -6799,7 +6951,7 @@ mod tests { // Client: receive a push stream #[test] fn push_single_wo_status() { - const FIRST_PUSH_ID: u64 = 0; + const FIRST_PUSH_ID: PushId = PushId::new(0); // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); @@ -6815,13 +6967,7 @@ mod tests { ]; server.encode_headers(request_stream_id, &headers, &mut d); - send_data_on_push( - &mut server.conn, - push_stream_id, - u8::try_from(FIRST_PUSH_ID).unwrap(), - &d, - false, - ); + send_data_on_push(&mut server.conn, push_stream_id, FIRST_PUSH_ID, &d, false); server_send_response_and_exchange_packet( &mut client, diff --git a/neqo-http3/src/frames/hframe.rs b/neqo-http3/src/frames/hframe.rs index 707242ae48..05c3e240fe 100644 --- a/neqo-http3/src/frames/hframe.rs +++ b/neqo-http3/src/frames/hframe.rs @@ -10,7 +10,7 @@ use neqo_common::{Decoder, Encoder}; use neqo_crypto::random; use neqo_transport::StreamId; -use crate::{frames::reader::FrameDecoder, settings::HSettings, Error, Priority, Res}; +use crate::{frames::reader::FrameDecoder, settings::HSettings, Error, Priority, PushId, Res}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct HFrameType(pub u64); @@ -48,20 +48,20 @@ pub enum HFrame { header_block: Vec, }, CancelPush { - push_id: u64, + push_id: PushId, }, Settings { settings: HSettings, }, PushPromise { - push_id: u64, + push_id: PushId, header_block: Vec, }, Goaway { stream_id: StreamId, }, MaxPushId { - push_id: u64, + push_id: PushId, }, Grease, PriorityUpdateRequest { @@ -117,7 +117,9 @@ impl HFrame { push_id, header_block, } => { - enc.encode_varint((header_block.len() + (Encoder::varint_len(*push_id))) as u64); + enc.encode_varint( + (header_block.len() + (Encoder::varint_len(u64::from(*push_id)))) as u64, + ); enc.encode_varint(*push_id); enc.encode(header_block); } @@ -177,7 +179,7 @@ impl FrameDecoder for HFrame { header_block: dec.decode_remainder().to_vec(), }), H3_FRAME_TYPE_CANCEL_PUSH => Some(Self::CancelPush { - push_id: dec.decode_varint().ok_or(Error::HttpFrame)?, + push_id: dec.decode_varint().ok_or(Error::HttpFrame)?.into(), }), H3_FRAME_TYPE_SETTINGS => { let mut settings = HSettings::default(); @@ -191,14 +193,14 @@ impl FrameDecoder for HFrame { Some(Self::Settings { settings }) } H3_FRAME_TYPE_PUSH_PROMISE => Some(Self::PushPromise { - push_id: dec.decode_varint().ok_or(Error::HttpFrame)?, + push_id: dec.decode_varint().ok_or(Error::HttpFrame)?.into(), header_block: dec.decode_remainder().to_vec(), }), H3_FRAME_TYPE_GOAWAY => Some(Self::Goaway { stream_id: StreamId::new(dec.decode_varint().ok_or(Error::HttpFrame)?), }), H3_FRAME_TYPE_MAX_PUSH_ID => Some(Self::MaxPushId { - push_id: dec.decode_varint().ok_or(Error::HttpFrame)?, + push_id: dec.decode_varint().ok_or(Error::HttpFrame)?.into(), }), H3_FRAME_TYPE_PRIORITY_UPDATE_REQUEST | H3_FRAME_TYPE_PRIORITY_UPDATE_PUSH => { let element_id = dec.decode_varint().ok_or(Error::HttpFrame)?; diff --git a/neqo-http3/src/frames/tests/hframe.rs b/neqo-http3/src/frames/tests/hframe.rs index e62e6c43f0..3b8bc42bb8 100644 --- a/neqo-http3/src/frames/tests/hframe.rs +++ b/neqo-http3/src/frames/tests/hframe.rs @@ -12,7 +12,7 @@ use super::enc_dec_hframe; use crate::{ frames::HFrame, settings::{HSetting, HSettingType, HSettings}, - Priority, + Priority, PushId, }; #[test] @@ -31,7 +31,9 @@ fn headers_frame() { #[test] fn cancel_push_frame4() { - let f = HFrame::CancelPush { push_id: 5 }; + let f = HFrame::CancelPush { + push_id: PushId::new(5), + }; enc_dec_hframe(&f, "030105", 0); } @@ -46,7 +48,7 @@ fn settings_frame4() { #[test] fn push_promise_frame4() { let f = HFrame::PushPromise { - push_id: 4, + push_id: PushId::new(4), header_block: vec![0x61, 0x62, 0x63, 0x64], }; enc_dec_hframe(&f, "05050461626364", 0); diff --git a/neqo-http3/src/frames/tests/reader.rs b/neqo-http3/src/frames/tests/reader.rs index dce2e0cf39..5e0596d5d3 100644 --- a/neqo-http3/src/frames/tests/reader.rs +++ b/neqo-http3/src/frames/tests/reader.rs @@ -15,7 +15,7 @@ use crate::{ reader::FrameDecoder, FrameReader, HFrame, StreamReaderConnectionWrapper, WebTransportFrame, }, settings::{HSetting, HSettingType, HSettings}, - Error, + Error, PushId, }; struct FrameReaderTest { @@ -112,7 +112,7 @@ fn frame_reading_with_stream_push_promise() { header_block, } = frame.unwrap() { - assert_eq!(push_id, 257); + assert_eq!(push_id, PushId::new(257)); assert_eq!(header_block, &[0x1, 0x2, 0x3]); } else { panic!("wrong frame type"); @@ -154,7 +154,7 @@ fn unknown_frame() { let frame = fr.process(&[0x03, 0x01, 0x05]); assert!(frame.is_some()); if let HFrame::CancelPush { push_id } = frame.unwrap() { - assert!(push_id == 5); + assert!(push_id == PushId::new(5)); } else { panic!("wrong frame type"); } @@ -417,7 +417,9 @@ fn complete_and_incomplete_frames() { test_complete_and_incomplete_frame::(&buf, buf.len()); // H3_FRAME_TYPE_CANCEL_PUSH - let f = HFrame::CancelPush { push_id: 5 }; + let f = HFrame::CancelPush { + push_id: PushId::new(5), + }; let mut enc = Encoder::default(); f.encode(&mut enc); let buf: Vec<_> = enc.into(); @@ -434,7 +436,7 @@ fn complete_and_incomplete_frames() { // H3_FRAME_TYPE_PUSH_PROMISE let f = HFrame::PushPromise { - push_id: 4, + push_id: PushId::new(4), header_block: HEADER_BLOCK.to_vec(), }; let mut enc = Encoder::default(); @@ -452,7 +454,9 @@ fn complete_and_incomplete_frames() { test_complete_and_incomplete_frame::(&buf, buf.len()); // H3_FRAME_TYPE_MAX_PUSH_ID - let f = HFrame::MaxPushId { push_id: 5 }; + let f = HFrame::MaxPushId { + push_id: PushId::new(5), + }; let mut enc = Encoder::default(); f.encode(&mut enc); let buf: Vec<_> = enc.into(); diff --git a/neqo-http3/src/lib.rs b/neqo-http3/src/lib.rs index a01b00e1a7..5f566549ea 100644 --- a/neqo-http3/src/lib.rs +++ b/neqo-http3/src/lib.rs @@ -145,6 +145,7 @@ mod frames; mod headers_checks; mod priority; mod push_controller; +mod push_id; mod qlog; mod qpack_decoder_receiver; mod qpack_encoder_receiver; @@ -174,6 +175,7 @@ use neqo_transport::{ AppError, Connection, Error as TransportError, RecvStreamStats, SendStreamStats, }; pub use priority::Priority; +pub use push_id::PushId; pub use server::Http3Server; pub use server_events::{ Http3OrWebTransportStream, Http3ServerEvent, WebTransportRequest, WebTransportServerEvent, diff --git a/neqo-http3/src/push_controller.rs b/neqo-http3/src/push_controller.rs index 6efe010c5b..564b149c2b 100644 --- a/neqo-http3/src/push_controller.rs +++ b/neqo-http3/src/push_controller.rs @@ -20,7 +20,7 @@ use crate::{ client_events::{Http3ClientEvent, Http3ClientEvents}, connection::Http3Connection, frames::HFrame, - CloseType, Error, Http3StreamInfo, HttpRecvStreamEvents, RecvStreamEvents, Res, + CloseType, Error, Http3StreamInfo, HttpRecvStreamEvents, PushId, RecvStreamEvents, Res, }; /// `PushStates`: @@ -57,27 +57,27 @@ enum PushState { #[derive(Debug)] struct ActivePushStreams { push_streams: VecDeque, - first_push_id: u64, + first_push_id: PushId, } impl ActivePushStreams { pub const fn new() -> Self { Self { push_streams: VecDeque::new(), - first_push_id: 0, + first_push_id: PushId::new(0), } } /// Returns None if a stream has been closed already. pub fn get_mut( &mut self, - push_id: u64, + push_id: PushId, ) -> Option<&mut >::Output> { if push_id < self.first_push_id { return None; } - let inx = usize::try_from(push_id - self.first_push_id).unwrap(); + let inx = usize::try_from(u64::from(push_id - self.first_push_id)).unwrap(); if inx >= self.push_streams.len() { self.push_streams.resize(inx + 1, PushState::Init); } @@ -88,19 +88,19 @@ impl ActivePushStreams { } /// Returns None if a stream has been closed already. - pub fn get(&mut self, push_id: u64) -> Option<&mut PushState> { + pub fn get(&mut self, push_id: PushId) -> Option<&mut PushState> { self.get_mut(push_id) } /// Returns the State of a closed push stream or None for already closed streams. - pub fn close(&mut self, push_id: u64) -> Option { + pub fn close(&mut self, push_id: PushId) -> Option { match self.get_mut(push_id) { None | Some(PushState::Closed) => None, Some(s) => { let res = mem::replace(s, PushState::Closed); while self.push_streams.front() == Some(&PushState::Closed) { self.push_streams.pop_front(); - self.first_push_id += 1; + self.first_push_id.next(); } Some(res) } @@ -108,7 +108,7 @@ impl ActivePushStreams { } #[must_use] - pub fn number_done(&self) -> u64 { + pub fn number_done(&self) -> PushId { self.first_push_id + u64::try_from( self.push_streams @@ -120,7 +120,7 @@ impl ActivePushStreams { } pub fn clear(&mut self) { - self.first_push_id = 0; + self.first_push_id = PushId::new(0); self.push_streams.clear(); } } @@ -146,7 +146,7 @@ impl ActivePushStreams { #[derive(Debug)] pub struct PushController { max_concurent_push: u64, - current_max_push_id: u64, + current_max_push_id: PushId, // push_streams holds the states of push streams. // We keep a stream until the stream has been closed. push_streams: ActivePushStreams, @@ -160,7 +160,7 @@ impl PushController { pub const fn new(max_concurent_push: u64, conn_events: Http3ClientEvents) -> Self { Self { max_concurent_push, - current_max_push_id: 0, + current_max_push_id: PushId::new(0), push_streams: ActivePushStreams::new(), conn_events, } @@ -181,7 +181,7 @@ impl PushController { /// `HttpId` if `push_id` greater than it is allowed has been received. pub fn new_push_promise( &mut self, - push_id: u64, + push_id: PushId, ref_stream_id: StreamId, new_headers: Vec
, ) -> Res<()> { @@ -236,7 +236,7 @@ impl PushController { } } - pub fn add_new_push_stream(&mut self, push_id: u64, stream_id: StreamId) -> Res { + pub fn add_new_push_stream(&mut self, push_id: PushId, stream_id: StreamId) -> Res { qtrace!( "A new push stream with push_id={} stream_id={}", push_id, @@ -276,7 +276,7 @@ impl PushController { ) } - fn check_push_id(&self, push_id: u64) -> Res<()> { + fn check_push_id(&self, push_id: PushId) -> Res<()> { // Check if push id is greater than what we allow. if push_id > self.current_max_push_id { qerror!("Push id is greater than current_max_push_id."); @@ -288,7 +288,7 @@ impl PushController { pub fn handle_cancel_push( &mut self, - push_id: u64, + push_id: PushId, conn: &mut Connection, base_handler: &mut Http3Connection, ) -> Res<()> { @@ -324,7 +324,7 @@ impl PushController { } } - pub fn close(&mut self, push_id: u64) { + pub fn close(&mut self, push_id: PushId) { qtrace!("Push stream has been closed."); if let Some(push_state) = self.push_streams.close(push_id) { debug_assert!(matches!(push_state, PushState::Active { .. })); @@ -335,7 +335,7 @@ impl PushController { pub fn cancel( &mut self, - push_id: u64, + push_id: PushId, conn: &mut Connection, base_handler: &mut Http3Connection, ) -> Res<()> { @@ -377,7 +377,7 @@ impl PushController { } } - pub fn push_stream_reset(&mut self, push_id: u64, close_type: CloseType) { + pub fn push_stream_reset(&mut self, push_id: PushId, close_type: CloseType) { qtrace!("Push stream has been reset, push_id={}", push_id); if let Some(push_state) = self.push_streams.get(push_id) { @@ -404,7 +404,7 @@ impl PushController { } } - pub fn get_active_stream_id(&mut self, push_id: u64) -> Option { + pub fn get_active_stream_id(&mut self, push_id: PushId) -> Option { match self.push_streams.get(push_id) { Some(PushState::Active { stream_id, .. }) => Some(*stream_id), _ => None, @@ -414,7 +414,7 @@ impl PushController { pub fn maybe_send_max_push_id_frame(&mut self, base_handler: &mut Http3Connection) { let push_done = self.push_streams.number_done(); if self.max_concurent_push > 0 - && (self.current_max_push_id - push_done) <= (self.max_concurent_push / 2) + && (self.current_max_push_id - push_done) <= (self.max_concurent_push / 2).into() { self.current_max_push_id = push_done + self.max_concurent_push; base_handler.queue_control_frame(&HFrame::MaxPushId { @@ -424,7 +424,7 @@ impl PushController { } pub fn handle_zero_rtt_rejected(&mut self) { - self.current_max_push_id = 0; + self.current_max_push_id = PushId::new(0); } pub fn clear(&mut self) { @@ -435,7 +435,7 @@ impl PushController { self.max_concurent_push > 0 } - pub fn new_stream_event(&mut self, push_id: u64, event: Http3ClientEvent) { + pub fn new_stream_event(&mut self, push_id: PushId, event: Http3ClientEvent) { match self.push_streams.get_mut(push_id) { None => { debug_assert!(false, "Push has been closed already."); @@ -460,12 +460,12 @@ impl PushController { /// a `push_promise` has not been yet received for the stream. #[derive(Debug)] pub struct RecvPushEvents { - push_id: u64, + push_id: PushId, push_handler: Rc>, } impl RecvPushEvents { - pub const fn new(push_id: u64, push_handler: Rc>) -> Self { + pub const fn new(push_id: PushId, push_handler: Rc>) -> Self { Self { push_id, push_handler, diff --git a/neqo-http3/src/push_id.rs b/neqo-http3/src/push_id.rs new file mode 100644 index 0000000000..72398f65f2 --- /dev/null +++ b/neqo-http3/src/push_id.rs @@ -0,0 +1,55 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::ops::{Add, Sub}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd)] +pub struct PushId(u64); + +impl PushId { + #[must_use] + pub const fn new(id: u64) -> Self { + Self(id) + } + + pub fn next(&mut self) { + self.0 += 1; + } +} + +impl From for PushId { + fn from(id: u64) -> Self { + Self(id) + } +} + +impl From for u64 { + fn from(id: PushId) -> Self { + id.0 + } +} + +impl ::std::fmt::Display for PushId { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Sub for PushId { + type Output = Self; + + fn sub(self, rhs: Self) -> Self { + Self(self.0 - rhs.0) + } +} + +impl Add for PushId { + type Output = Self; + + fn add(self, rhs: u64) -> Self { + Self(self.0 + rhs) + } +} diff --git a/neqo-http3/src/recv_message.rs b/neqo-http3/src/recv_message.rs index de8dfcec66..406bca0466 100644 --- a/neqo-http3/src/recv_message.rs +++ b/neqo-http3/src/recv_message.rs @@ -16,7 +16,7 @@ use crate::{ priority::PriorityHandler, push_controller::PushController, qlog, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpRecvStream, HttpRecvStreamEvents, - MessageType, Priority, ReceiveOutput, RecvStream, Res, Stream, + MessageType, Priority, PushId, ReceiveOutput, RecvStream, Res, Stream, }; #[allow(clippy::module_name_repetitions)] @@ -61,7 +61,7 @@ enum RecvMessageState { #[derive(Debug)] struct PushInfo { - push_id: u64, + push_id: PushId, header_block: Vec, } @@ -220,7 +220,7 @@ impl RecvMessage { Ok(()) } - fn handle_push_promise(&mut self, push_id: u64, header_block: Vec) -> Res<()> { + fn handle_push_promise(&mut self, push_id: PushId, header_block: Vec) -> Res<()> { if self.push_handler.is_none() { return Err(Error::HttpFrameUnexpected); } diff --git a/neqo-http3/src/stream_type_reader.rs b/neqo-http3/src/stream_type_reader.rs index 8ab8e404b3..5c5fe75943 100644 --- a/neqo-http3/src/stream_type_reader.rs +++ b/neqo-http3/src/stream_type_reader.rs @@ -11,7 +11,7 @@ use neqo_transport::{Connection, StreamId, StreamType}; use crate::{ control_stream_local::HTTP3_UNI_STREAM_TYPE_CONTROL, frames::{hframe::HFrameType, reader::FrameDecoder, HFrame, H3_FRAME_TYPE_HEADERS}, - CloseType, Error, Http3StreamType, ReceiveOutput, RecvStream, Res, Stream, + CloseType, Error, Http3StreamType, PushId, ReceiveOutput, RecvStream, Res, Stream, }; pub const HTTP3_UNI_STREAM_TYPE_PUSH: u64 = 0x1; @@ -23,7 +23,7 @@ pub enum NewStreamType { Control, Decoder, Encoder, - Push(u64), + Push(PushId), WebTransportStream(u64), Http(u64), Unknown, @@ -183,7 +183,7 @@ impl NewStreamHeadReader { return Err(Error::HttpGeneralProtocol); } return if is_push { - Ok(Some(NewStreamType::Push(output))) + Ok(Some(NewStreamType::Push(PushId::new(output)))) } else { Ok(Some(NewStreamType::WebTransportStream(output))) }; @@ -253,7 +253,7 @@ mod tests { use crate::{ control_stream_local::HTTP3_UNI_STREAM_TYPE_CONTROL, frames::{H3_FRAME_TYPE_HEADERS, H3_FRAME_TYPE_SETTINGS}, - CloseType, Error, NewStreamType, ReceiveOutput, RecvStream, Res, + CloseType, Error, NewStreamType, PushId, ReceiveOutput, RecvStream, Res, }; struct Test { @@ -366,7 +366,7 @@ mod tests { &[HTTP3_UNI_STREAM_TYPE_PUSH, 0xaaaa_aaaa], false, &Ok(( - ReceiveOutput::NewStream(NewStreamType::Push(0xaaaa_aaaa)), + ReceiveOutput::NewStream(NewStreamType::Push(PushId::new(0xaaaa_aaaa))), true, )), true, @@ -429,7 +429,7 @@ mod tests { * HTTP3_UNI_STREAM_TYPE_PUSH */ false, &Ok(( - ReceiveOutput::NewStream(NewStreamType::Push(0xaaaa_aaaa)), + ReceiveOutput::NewStream(NewStreamType::Push(PushId::new(0xaaaa_aaaa))), true, )), true,