Skip to content

Commit

Permalink
Move time from DataPoint to Histogram/ExpoHistogram (#2411)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt authored Dec 11, 2024
1 parent 1a4e931 commit e4cba94
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 82 deletions.
8 changes: 4 additions & 4 deletions opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ pub mod tonic {
.iter()
.map(|dp| TonicHistogramDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(hist.start_time),
time_unix_nano: to_nanos(hist.time),
count: dp.count,
sum: Some(dp.sum.into_f64()),
bucket_counts: dp.bucket_counts.clone(),
Expand All @@ -258,8 +258,8 @@ pub mod tonic {
.iter()
.map(|dp| TonicExponentialHistogramDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(hist.start_time),
time_unix_nano: to_nanos(hist.time),
count: dp.count as u64,
sum: Some(dp.sum.into_f64()),
scale: dp.scale.into(),
Expand Down
26 changes: 8 additions & 18 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
pub struct Histogram<T> {
/// Individual aggregated measurements with unique attributes.
pub data_points: Vec<HistogramDataPoint<T>>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
Expand All @@ -166,11 +170,6 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Histogram<T> {
pub struct HistogramDataPoint<T> {
/// The set of key value pairs that uniquely identify the time series.
pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,

/// The number of updates this histogram has been calculated with.
pub count: u64,
/// The upper bounds of the buckets of the histogram.
Expand All @@ -195,8 +194,6 @@ impl<T: Copy> Clone for HistogramDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
count: self.count,
bounds: self.bounds.clone(),
bucket_counts: self.bucket_counts.clone(),
Expand All @@ -213,7 +210,10 @@ impl<T: Copy> Clone for HistogramDataPoint<T> {
pub struct ExponentialHistogram<T> {
/// The individual aggregated measurements with unique attributes.
pub data_points: Vec<ExponentialHistogramDataPoint<T>>,

/// When the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
Expand All @@ -233,10 +233,6 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
pub struct ExponentialHistogramDataPoint<T> {
/// The set of key value pairs that uniquely identify the time series.
pub attributes: Vec<KeyValue>,
/// When the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,

/// The number of updates this histogram has been calculated with.
pub count: usize,
Expand Down Expand Up @@ -281,8 +277,6 @@ impl<T: Copy> Clone for ExponentialHistogramDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
count: self.count,
min: self.min,
max: self.max,
Expand Down Expand Up @@ -375,8 +369,6 @@ mod tests {

let histogram_data_point = HistogramDataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
count: 0,
bounds: vec![],
bucket_counts: vec![],
Expand All @@ -395,8 +387,6 @@ mod tests {

let exponential_histogram_data_point = ExponentialHistogramDataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
count: 0,
min: None,
max: None,
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ mod tests {
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
count: 2,
bounds: vec![1.0, 2.0],
bucket_counts: vec![0, 1, 1],
Expand All @@ -324,6 +322,8 @@ mod tests {
sum: 3u64,
exemplars: vec![],
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down Expand Up @@ -357,8 +357,6 @@ mod tests {
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
count: 2,
min: None,
max: None,
Expand All @@ -376,6 +374,8 @@ mod tests {
zero_threshold: 1.0,
exemplars: vec![],
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down
56 changes: 30 additions & 26 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,33 +389,34 @@ impl<T: Number> ExpoHistogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time,
time,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;

let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
h.start_time = start_time;
h.time = time;

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, attr| {
let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
min: if self.record_min_max {
Some(b.min)
Expand Down Expand Up @@ -450,33 +451,34 @@ impl<T: Number> ExpoHistogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time,
time,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;

let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
h.start_time = start_time;
h.time = time;

self.value_map
.collect_readonly(&mut h.data_points, |attributes, attr| {
let b = attr.lock().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
min: if self.record_min_max {
Some(b.min)
Expand Down Expand Up @@ -1270,8 +1272,6 @@ mod tests {
min: Some(1.into()),
max: Some(16.into()),
sum: 31.into(),
start_time: SystemTime::now(),
time: SystemTime::now(),
scale: -1,
positive_bucket: data::ExponentialBucket {
offset: -1,
Expand All @@ -1285,6 +1285,8 @@ mod tests {
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand Down Expand Up @@ -1318,8 +1320,6 @@ mod tests {
offset: -1,
counts: vec![1, 4, 1],
},
start_time: SystemTime::now(),
time: SystemTime::now(),
negative_bucket: data::ExponentialBucket {
offset: 0,
counts: vec![],
Expand All @@ -1328,6 +1328,8 @@ mod tests {
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand Down Expand Up @@ -1364,8 +1366,6 @@ mod tests {
offset: -1,
counts: vec![1, 4, 1],
},
start_time: SystemTime::now(),
time: SystemTime::now(),
negative_bucket: data::ExponentialBucket {
offset: 0,
counts: vec![],
Expand All @@ -1374,6 +1374,8 @@ mod tests {
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand Down Expand Up @@ -1410,8 +1412,6 @@ mod tests {
counts: vec![1, 6, 2],
},
attributes: vec![],
start_time: SystemTime::now(),
time: SystemTime::now(),
negative_bucket: data::ExponentialBucket {
offset: 0,
counts: vec![],
Expand All @@ -1420,6 +1420,8 @@ mod tests {
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand All @@ -1430,6 +1432,8 @@ mod tests {

let mut got: Box<dyn data::Aggregation> = Box::new(data::ExponentialHistogram::<T> {
data_points: vec![],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: Temporality::Delta,
});
let mut count = 0;
Expand Down
39 changes: 21 additions & 18 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,33 @@ impl<T: Number> Histogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time,
time,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;

let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
h.start_time = start_time;
h.time = time;

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts,
Expand Down Expand Up @@ -164,32 +165,34 @@ impl<T: Number> Histogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time,
time,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;

let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
h.start_time = start_time;
h.time = time;

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
Expand Down
Loading

0 comments on commit e4cba94

Please sign in to comment.