Skip to content

Commit

Permalink
refactor: Update DynamicMessage to consolidate fields into a single v…
Browse files Browse the repository at this point in the history
…alue by field number #73
  • Loading branch information
rholshausen committed Nov 11, 2024
1 parent e2e08c9 commit 3d60553
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 65 deletions.
111 changes: 52 additions & 59 deletions src/dynamic_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,43 +68,54 @@ impl Codec for PactCodec {
}
}

/// Dynamic message support based on a vector of ProtobufField field values
/// Dynamic message support based on a vector of ProtobufField field values. Internally, it will
/// consolidate all fields with the same field number.
#[derive(Debug, Clone)]
pub struct DynamicMessage {
fields: HashMap<u32, Vec<ProtobufField>>,
descriptors: FileDescriptorSet,
message_descriptor: DescriptorProto
fields: HashMap<u32, ProtobufField>,
descriptors: FileDescriptorSet
}

impl DynamicMessage {
/// Create a new message from the slice of fields
pub fn new(
message_descriptor: &DescriptorProto,
field_data: &[ProtobufField],
descriptors: &FileDescriptorSet
) -> DynamicMessage {
let fields = field_data.iter()
.map(|f| (f.field_num, f.clone()))
.into_group_map();
let fields = fields.iter()
.map(|(field_num, fields)| {
let mut fields = fields.clone();
let field = fields.iter_mut()
.reduce(|field, f| {
field.additional_data.push(f.data.clone());
field
} )
.unwrap(); // safe to unwrap, the group by above can't create an empty vector.
(*field_num, field.clone())
})
.collect();
DynamicMessage {
fields: field_data.iter().map(|f| (f.field_num, f.clone())).into_group_map(),
message_descriptor: message_descriptor.clone(),
fields,
descriptors: descriptors.clone()
}
}

/// Return a vector of the fields
pub fn proto_fields(&self) -> Vec<ProtobufField> {
self.fields.values().flatten().cloned().collect()
self.fields.values().cloned().collect()
}

/// Encode this message to the provided buffer
pub fn write_to<B>(&self, buffer: &mut B) -> anyhow::Result<()> where B: BufMut {
for (field_num, values) in self.fields.iter()
for (field_num, field) in self.fields.iter()
.sorted_by(|(a, _), (b, _)| Ord::cmp(a, b)) {
for field in values {
Self::write_field(buffer, *field_num, field, &field.data)?;
if field.repeated_field() && !field.additional_data.is_empty() {
for data in &field.additional_data {
Self::write_field(buffer, *field_num, field, data)?;
}
Self::write_field(buffer, *field_num, field, &field.data)?;
if field.repeated_field() && !field.additional_data.is_empty() {
for data in &field.additional_data {
Self::write_field(buffer, *field_num, field, data)?;
}
}
}
Expand Down Expand Up @@ -245,7 +256,7 @@ impl DynamicMessage {
let mut buffer = Bytes::copy_from_slice(data);
match decode_message(&mut buffer, descriptor, &descriptors) {
Ok(fields) => {
let mut message = DynamicMessage::new(descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
message.match_path(path_tokens, callback)?;
data.clear();
message.write_to(data).map_err(|err| {
Expand Down Expand Up @@ -376,19 +387,12 @@ fn as_array(data: &DataValue) -> anyhow::Result<Vec<DataValue>> {
}

fn find_field_value<'a>(
fields: &'a mut HashMap<u32, Vec<ProtobufField>>,
fields: &'a mut HashMap<u32, ProtobufField>,
field_name: &str
) -> Option<&'a mut ProtobufField> {
fields.iter_mut()
.find(|(_, fields)| fields.iter().any(|field| field.field_name == field_name))
.map(|(_, fields)| {
if fields.len() > 1 {
warn!("There is more than one field value, additional field values should be encoded \
into the field's additional_values attribute otherwise they are ignored.");
}
fields.first_mut()
})
.flatten()
.find(|(_, field)| field.field_name == field_name)
.map(|(_, field)| field)
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -435,7 +439,7 @@ impl Decoder for DynamicMessageDecoder {
#[instrument]
fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
match decode_message(src, &self.descriptor, &self.file_descriptor_set) {
Ok(fields) => Ok(Some(DynamicMessage::new(&self.descriptor, fields.as_slice(), &self.file_descriptor_set))),
Ok(fields) => Ok(Some(DynamicMessage::new(fields.as_slice(), &self.file_descriptor_set))),
Err(err) => {
error!("Failed to decode the message - {err}");
Err(Status::invalid_argument(format!("Failed to decode the message - {err}")))
Expand Down Expand Up @@ -467,8 +471,7 @@ mod tests {
let descriptors = FileDescriptorSet {
file: vec![]
};
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let path = DocPath::new("$.one.two.three").unwrap();
expect!(message.fetch_field_value(&path)).to(be_none());
}
Expand All @@ -487,8 +490,7 @@ mod tests {
file: vec![]
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let path = DocPath::new("one").unwrap();
expect!(message.fetch_field_value(&path)).to(be_some().value(field));
}
Expand All @@ -506,9 +508,8 @@ mod tests {
let descriptors = FileDescriptorSet {
file: vec![]
};
let descriptor = DescriptorProto::default();
let fields = vec![ field.clone() ];
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let path = DocPath::new("$.one").unwrap();
expect!(message.fetch_field_value(&path)).to(be_some().value(field));
}
Expand Down Expand Up @@ -575,8 +576,7 @@ mod tests {
let descriptors = FileDescriptorSet {
file: vec![]
};
let descriptor = DescriptorProto::default();
let child_message = DynamicMessage::new(&child_descriptor, &[child_field.clone(), child_field2], &descriptors);
let child_message = DynamicMessage::new(&[child_field.clone(), child_field2], &descriptors);
let mut buffer = BytesMut::new();
child_message.write_to(&mut buffer).unwrap();
let field = ProtobufField {
Expand All @@ -588,7 +588,7 @@ mod tests {
descriptor: child_proto_1.clone()
};
let fields = vec![ field.clone() ];
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let path = DocPath::new("$.one.two").unwrap();
expect!(message.fetch_field_value(&path)).to(be_some().value(child_field));
}
Expand All @@ -599,8 +599,7 @@ mod tests {
let descriptors = FileDescriptorSet {
file: vec![]
};
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let path = DocPath::new_unwrap("$.one.two.three");
let generators = hashmap!{
path.clone() => RandomInt(1, 10)
Expand All @@ -623,8 +622,7 @@ mod tests {
file: vec![]
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let generators = hashmap!{
DocPath::new_unwrap("$.two") => RandomInt(1, 10)
};
Expand All @@ -646,8 +644,7 @@ mod tests {
file: vec![]
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let generators = hashmap!{
DocPath::new_unwrap("$.one") => RandomInt(1, 10)
};
Expand All @@ -662,13 +659,13 @@ mod tests {
name: Some("two".to_string()),
number: Some(1),
r#type: Some(3),
..FieldDescriptorProto::default()
.. FieldDescriptorProto::default()
};
let child_proto_2 = FieldDescriptorProto {
name: Some("three".to_string()),
number: Some(2),
r#type: Some(3),
..FieldDescriptorProto::default()
.. FieldDescriptorProto::default()
};
let child_descriptor = DescriptorProto {
name: Some("child".to_string()),
Expand Down Expand Up @@ -697,7 +694,7 @@ mod tests {
let descriptors = FileDescriptorSet {
file: vec![]
};
let child_message = DynamicMessage::new(&child_descriptor, &[child_field.clone(), child_field2], &descriptors);
let child_message = DynamicMessage::new(&[child_field.clone(), child_field2], &descriptors);
let mut buffer = BytesMut::new();
child_message.write_to(&mut buffer).unwrap();
let field = ProtobufField {
Expand All @@ -709,8 +706,7 @@ mod tests {
descriptor: child_proto_1.clone()
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let path = DocPath::new_unwrap("$.one.two");
let generators = hashmap!{
path.clone() => RandomInt(1, 10)
Expand Down Expand Up @@ -738,8 +734,7 @@ mod tests {
file: vec![]
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let generators = hashmap!{
DocPath::new_unwrap("$.one") => ProviderStateGenerator("a".to_string(), None)
};
Expand Down Expand Up @@ -778,8 +773,7 @@ mod tests {
file: vec![]
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let generators = hashmap!{
DocPath::new_unwrap("$.one") => ProviderStateGenerator("a".to_string(), None)
};
Expand Down Expand Up @@ -811,8 +805,7 @@ mod tests {
file: vec![]
};
let fields = vec![ field.clone() ];
let descriptor = DescriptorProto::default();
let mut message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let mut message = DynamicMessage::new(fields.as_slice(), &descriptors);
let generators = hashmap!{
DocPath::new_unwrap("$.one") => ProviderStateGenerator("a".to_string(), None)
};
Expand Down Expand Up @@ -853,7 +846,7 @@ mod tests {
],
.. DescriptorProto::default()
};
let message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let message = DynamicMessage::new(fields.as_slice(), &descriptors);

let mut buffer = BytesMut::new();
message.write_to(&mut buffer).unwrap();
Expand Down Expand Up @@ -924,7 +917,7 @@ mod tests {
],
.. DescriptorProto::default()
};
let message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let message = DynamicMessage::new(fields.as_slice(), &descriptors);

let mut buffer = BytesMut::new();
message.write_to(&mut buffer).unwrap();
Expand Down Expand Up @@ -998,7 +991,7 @@ mod tests {
]
};

let child_message = DynamicMessage::new(&child_descriptor, &[child_field.clone(), child_field2], &descriptors);
let child_message = DynamicMessage::new(&[child_field.clone(), child_field2], &descriptors);
let mut child_buffer = BytesMut::new();
child_message.write_to(&mut child_buffer).unwrap();

Expand All @@ -1011,7 +1004,7 @@ mod tests {
descriptor: field_descriptor.clone()
};
let fields = vec![ field.clone() ];
let message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let message = DynamicMessage::new(fields.as_slice(), &descriptors);

let mut buffer = BytesMut::new();
message.write_to(&mut buffer).unwrap();
Expand Down Expand Up @@ -1104,7 +1097,7 @@ mod tests {
],
.. DescriptorProto::default()
};
let message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let message = DynamicMessage::new(fields.as_slice(), &descriptors);

let mut buffer = BytesMut::new();
message.write_to(&mut buffer).unwrap();
Expand Down Expand Up @@ -1179,7 +1172,7 @@ mod tests {
],
.. DescriptorProto::default()
};
let message = DynamicMessage::new(&descriptor, fields.as_slice(), &descriptors);
let message = DynamicMessage::new(fields.as_slice(), &descriptors);

let mut buffer = BytesMut::new();
message.write_to(&mut buffer).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/mock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl MockService {
error!("Failed to encode response message - {}", err);
Status::invalid_argument(err.to_string())
})?;
let mut message = DynamicMessage::new(&response_descriptor, &response_message_fields, &self.file_descriptor_set);
let mut message = DynamicMessage::new(&response_message_fields, &self.file_descriptor_set);
self.apply_generators(&mut message, &response_contents).map_err(|err| {
error!("Failed to generate response message - {}", err);
Status::invalid_argument(err.to_string())
Expand Down Expand Up @@ -364,7 +364,7 @@ mod tests {
let bytes = BASE64.decode("EgoNAABAQBUAAIBA").unwrap();
let mut bytes2 = BytesMut::from(bytes.as_slice());
let fields = decode_message(&mut bytes2, input_message, fds).unwrap();
let request = DynamicMessage::new(input_message, fields.as_slice(), &file_descriptor_set);
let request = DynamicMessage::new(fields.as_slice(), &file_descriptor_set);

let mock_service = MockService {
file_descriptor_set: file_descriptor_set.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ fn generate_protobuf_contents(
all_descriptors: &FileDescriptorSet,
mode: TestMode
) -> anyhow::Result<Body> {
let mut message: DynamicMessage = DynamicMessage::new(message_descriptor, fields, all_descriptors);
let mut message: DynamicMessage = DynamicMessage::new(fields, all_descriptors);
let context = hashmap!{};

let mut generator_map = hashmap!{};
Expand Down Expand Up @@ -953,7 +953,7 @@ impl PactPlugin for ProtobufPactPlugin {
let test_context = config.iter().map(|(k, v)| (k.as_str(), v.clone())).collect();
let decoded_body = match decode_message(&mut raw_request_body, &input_message, &all_file_desc) {
Ok(field_values) => {
let mut message = DynamicMessage::new(&input_message, &field_values, &all_file_desc);
let mut message = DynamicMessage::new(&field_values, &all_file_desc);
if let Err(err) = message.apply_generators(
interaction.request.generators.categories.get(&GeneratorCategory::BODY),
&GeneratorTestMode::Provider,
Expand Down
2 changes: 1 addition & 1 deletion src/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ fn build_grpc_request(
trace!(?body, ?metadata, ?file_desc, ?input_desc, ">> build_grpc_request");
let mut bytes = body.value().unwrap_or_default();
let message_fields = decode_message(&mut bytes, input_desc, file_desc)?;
let mut request = Request::new(DynamicMessage::new(input_desc, &message_fields, file_desc));
let mut request = Request::new(DynamicMessage::new(&message_fields, file_desc));
let request_metadata = request.metadata_mut();
for (key, md) in metadata {
if key != "request-path" {
Expand Down
2 changes: 1 addition & 1 deletion tests/mock_server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,6 @@ async fn each_value_matcher() {
descriptor: field_descriptor.clone()
};
let fields = vec![ field, field2 ];
let message = DynamicMessage::new(&input_message, fields.as_slice(), &fds);
let message = DynamicMessage::new(fields.as_slice(), &fds);
grpc.unary(Request::new(message), path, codec).await.unwrap();
}

0 comments on commit 3d60553

Please sign in to comment.