From ad417e9690dc7d0b658538a55a5cae46c05422b6 Mon Sep 17 00:00:00 2001 From: Zach Zhu Date: Thu, 14 Dec 2023 16:10:26 +0800 Subject: [PATCH] algo: tweaks Signed-off-by: Zach Zhu --- algorithm/kapacity/metric/query.py | 19 ++++++------------- .../portrait/horizontal/predictive/main.py | 16 ++++++---------- .../predictive/replicas_estimator.py | 18 +++++++++--------- 3 files changed, 21 insertions(+), 32 deletions(-) diff --git a/algorithm/kapacity/metric/query.py b/algorithm/kapacity/metric/query.py index a36afa4..43d2ddf 100644 --- a/algorithm/kapacity/metric/query.py +++ b/algorithm/kapacity/metric/query.py @@ -171,19 +171,12 @@ def query_metrics(addr, query, start, end): def convert_metric_series_to_dataframe(series): - dataframe = None - for item in series: - array = [] - for point in item.points: - array.append([point.timestamp, point.value]) - df = pd.DataFrame(array, columns=['timestamp', 'value'], dtype=float) - df['timestamp'] = df['timestamp'].map(lambda x: x / 1000).astype('int64') - if dataframe is not None: - # TODO: consider if it's possible to have multiple series - pd.merge(dataframe, df, how='left', on='timestamp') - else: - dataframe = df - return dataframe + df_list = [] + for point in series[0].points: + df_list.append([point.timestamp, point.value]) + df = pd.DataFrame(df_list, columns=['timestamp', 'value'], dtype=float) + df['timestamp'] = df['timestamp'].map(lambda x: x / 1000).astype('int64') + return df def time_period_to_minutes(time_period): diff --git a/algorithm/kapacity/portrait/horizontal/predictive/main.py b/algorithm/kapacity/portrait/horizontal/predictive/main.py index ad8804f..a3c68f3 100644 --- a/algorithm/kapacity/portrait/horizontal/predictive/main.py +++ b/algorithm/kapacity/portrait/horizontal/predictive/main.py @@ -33,7 +33,6 @@ class EnvInfo: class MetricsContext: workload_identifier = None - resource_name = None resource_target = 0 resource_history = None replicas_history = None @@ -130,7 +129,7 @@ def predict_replicas(args, metric_ctx, pred_traffics): pred = estimator.estimate(history, pred_traffics, 'timestamp', - metric_ctx.resource_name, + 'resource', 'replicas', traffic_col, metric_ctx.resource_target, @@ -155,12 +154,10 @@ def merge_history_dict(history_dict): def resample_by_freq(old_df, freq, agg_funcs): - df = old_df.copy() - df = df.sort_values(by='timestamp', ascending=True) + df = old_df.sort_values(by='timestamp', ascending=True) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s') - df = df.resample(rule=freq, on='timestamp').agg(agg_funcs) - df = df.rename_axis('timestamp').reset_index() - df['timestamp'] = df['timestamp'].astype('int64') // 10 ** 9 + df = df.resample(rule=freq, on='timestamp').agg(agg_funcs).reset_index() + df['timestamp'] = df['timestamp'].astype('int64') // 1e9 return df @@ -185,10 +182,9 @@ def fetch_metrics_history(args, env, hp_cr): resource = metric['containerResource'] else: raise RuntimeError('MetricTypeError') - resource_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end) - metric_ctx.resource_name = resource['name'] metric_ctx.resource_target = compute_resource_target(env.namespace, resource, scale_target) - metric_ctx.resource_history = resource_history.rename(columns={'value': resource['name']}) + resource_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end) + metric_ctx.resource_history = resource_history.rename(columns={'value': 'resource'}) elif i == 1: if metric_type != 'Pods': raise RuntimeError('MetricTypeError') diff --git a/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py b/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py index 6170913..f4b0902 100644 --- a/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py +++ b/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py @@ -631,15 +631,15 @@ class EstimationException(Exception): pass -def estimate(data, - data_pred, - time_col, - resource_col, - replicas_col, - traffic_cols, - resource_target, - time_delta_hours, - test_dataset_size_in_seconds=86400): +def estimate(data: pd.DataFrame, + data_pred: pd.DataFrame, + time_col: str, + resource_col: str, + replicas_col: str, + traffic_cols: list[str], + resource_target: float, + time_delta_hours: int, + test_dataset_size_in_seconds: int = 86400) -> pd.DataFrame: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') logger = logging.getLogger()