Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algo: time series forecasting tweaks #86

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions algorithm/kapacity/portrait/horizontal/predictive/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ def parse_args():
parser.add_argument('--tsf-model-path',
help='dir path containing related files of the time series forecasting model',
required=True, default='/opt/kapacity/timeseries/forcasting/model')
parser.add_argument('--tsf-freq', help='frequency (precision) of the time series forecasting model,'
'should be the same as set for training', required=True)
parser.add_argument('--tsf-dataloader-num-workers', help='number of worker subprocesses to use for data loading'
'of the time series forecasting model',
required=False, default=0)
parser.add_argument('--re-history-len', help='history length of training data for replicas estimation',
parser.add_argument('--re-history-len', help='history length of training data for replicas estimation,'
'should be longer than the context duration of the time series forecasting model',
required=True)
parser.add_argument('--re-time-delta-hours', help='time zone offset for replicas estimation model',
required=True)
parser.add_argument('--re-test-dataset-size-in-seconds',
help='size of test dataset in seconds for replicas estimation model',
required=False, default=86400)
parser.add_argument('--scaling-freq', help='frequency of scaling, the duration should be larger than tsf-freq',
parser.add_argument('--scaling-freq', help='frequency of scaling, the duration should be larger than the frequency'
'of the time series forecasting model',
required=True)
args = parser.parse_args()
return args
Expand All @@ -105,11 +105,8 @@ def predict_traffics(args, metric_ctx):
df = None
for key in metric_ctx.traffics_history_dict:
traffic_history = resample_by_freq(metric_ctx.traffics_history_dict[key], freq, {'value': 'mean'})
traffic_history = traffic_history[len(traffic_history) - context_length:len(traffic_history)]
traffic_history = traffic_history[len(traffic_history)-context_length:len(traffic_history)]
df_traffic = model.predict(df=traffic_history,
freq=args.tsf_freq,
target_col='value',
time_col='timestamp',
series_cols_dict={'workload': metric_ctx.workload_identifier, 'metric': key})
df_traffic.rename(columns={'value': key}, inplace=True)
if df is None:
Expand Down
47 changes: 20 additions & 27 deletions algorithm/kapacity/timeseries/forecasting/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def __init__(self,
self.seq_len = self.config['context_length'] + self.config['prediction_length']
self.config = config
self.features_map = {
'min': ['minute', 'hour', 'dayofweek'],
'1min': ['minute', 'hour', 'dayofweek'],
'10min': ['minute', 'hour', 'dayofweek', 'day'],
'H': ['hour', 'dayofweek', 'day'],
'1H': ['hour', 'dayofweek', 'day'],
'min': ['minute', 'hour', 'dayofweek', 'day'],
'1min': ['minute', 'hour', 'dayofweek', 'day'],
'10min': ['minute', 'hour', 'dayofweek', 'day'],
'D': ['dayofweek', 'day'],
'1D': ['dayofweek', 'day']
}
Expand Down Expand Up @@ -319,15 +319,15 @@ def __init__(self,
"""
super(Estimator, self).__init__()
self.config = config
assert self.config['freq'] in ['H', '1H', 'min', '1min', '10min', 'D',
'1D'], "freq must be in ['H','1H','min','1min','10min','D','1D']"
assert self.config['freq'] in ['min', '1min', '10min', 'H', '1H', 'D', '1D'], \
"freq must be in ['min','1min','10min','H','1H','D','1D']"

self.feat_cardinality_map = {
'min': [60, 24, 8],
'1min': [60, 24, 8],
'10min': [60, 24, 8, 32],
'H': [24, 8, 32],
'1H': [24, 8, 32],
'min': [60, 24, 8, 32],
'1min': [60, 24, 8, 32],
'10min': [60, 24, 8, 32],
'D': [8, 32],
'1D': [8, 32]
}
Expand Down Expand Up @@ -546,33 +546,25 @@ def test(self, test_loader):

def predict(self,
df,
freq,
target_col,
time_col,
series_cols_dict):
query_df, ctx_end_date = self.pre_processing_query(query=df,
time_col=time_col,
target_col=target_col,
series_cols_dict=series_cols_dict,
freq=freq)
series_cols_dict=series_cols_dict)
test_data, test_loader = self.get_data(
flag='test',
df=query_df,
df_path=None)
pred = self.test(test_loader=test_loader)
result = self.post_processing_result(pred=pred,
ctx_end_date=ctx_end_date,
time_col=time_col,
target_col=target_col,
freq=freq)
ctx_end_date=ctx_end_date)
return result

def pre_processing_query(self,
query,
time_col,
target_col,
series_cols_dict,
freq):
series_cols_dict):
time_col = self.config['time_col']
target_col = self.config['target_col']
freq = self.config['freq']

test_df = query.sort_values([time_col]).reset_index(drop=True)
ctx_end_date = test_df[time_col].iat[-1]

Expand All @@ -598,10 +590,11 @@ def pre_processing_query(self,

def post_processing_result(self,
pred,
ctx_end_date,
time_col,
target_col,
freq):
ctx_end_date):
time_col = self.config['time_col']
target_col = self.config['target_col']
freq = self.config['freq']

result_df = pd.DataFrame()
pred_len = self.config['prediction_length']
for i in range(1, pred_len + 1):
Expand Down