-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
324 lines (231 loc) · 8.63 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import mlflow
import pandas as pd
import os, sys
from dotenv import load_dotenv
from verispy import VERIS
from sklearn.model_selection import train_test_split
import itertools
from mlflow.tracking import MlflowClient
import urllib.request
from zipfile import ZipFile
import logging
# Environment variables
load_dotenv()
JSON_DIR = os.environ.get('JSON_DIR')
CSV_DIR = os.environ.get('CSV_DIR')
RCOLLAPSED = os.environ.get('R_COLLAPSED_CSV_NAME')
VERIS_DF = os.environ.get('BOOLEAN_CSV_NAME')
VERIS_DF_URL = os.environ.get('BOOLEAN_CSV_URL')
v = VERIS(json_dir=JSON_DIR)
def download_veris_csv(url=VERIS_DF_URL,
csv_dir=CSV_DIR,
veris_df_csv=VERIS_DF):
"""
Downloads the official veris_df dataset (boolean VCDB dataset)
from the vz-risk github and returns it as DataFrame.
Parameters
----------
url:
The remote path of the zipped folder containing the csv file
(e.g. 'https://raw.githubusercontent.com/vz-risk/VCDB/master/data/csv/vcdb.csv.zip')
csv_dir: str / path
The local path to store the csv tono-serving
Returns
----------
DataFrame
The veris_df dataset as Pandas DataFrame
"""
logging.info(f'Downloading from: {url}')
urllib.request.urlretrieve(url,
os.path.join(csv_dir, 'vcdb.csv.zip'))
with ZipFile(os.path.join(csv_dir, 'vcdb.csv.zip'), 'r') as zipObj:
zipObj.extract(csv_dir)
return pd.read_csv(os.path.join(csv_dir, veris_df_csv),
index_col=0,
low_memory=False)
def create_veris_csv(json_dir=JSON_DIR,
csv_dir=CSV_DIR,
veris_df_csv=VERIS_DF):
"""
Creates veris_df type dataset (boolean VCDB dataset) and stores as csv.
Parameters
----------
json_dir: str / path
The path of validated vcdb json files
csv_dir: str / path
The path to store the produced csv
veris_df_csv: str
The name of the csv file
Returns
----------
DataFrame
The veris_df dataset as Pandas DataFrame
"""
if json_dir == None or csv_dir == None:
logging.info("Need json and collapsed csv directories")
exit()
v = VERIS(json_dir=json_dir)
veris_df = v.json_to_df(verbose=False)
veris_df.to_csv(os.path.join(csv_dir, veris_df_csv))
return veris_df
def load_dataset(csv_dir=CSV_DIR,
veris_df_csv=VERIS_DF,
nrows=None):
"""
Loads veris_df type csv (boolean VCDB dataset) from disk as a
Pandas DataFrame.
Parameters
----------
csv_dir: str / path
The path to read the csv from
veris_df_csv: str
The name of the csv file
Returns
----------
DataFrame
The loaded veris_df dataset as Pandas DataFrame
"""
veris_df = pd.read_csv(os.path.join(csv_dir, veris_df_csv),
index_col=0,
low_memory=False,
nrows=nrows)
return veris_df
def check_y_statistics(y):
"""
Checks if output variable is statistically capable for training.
Parameters
----------
y: Pandas Series
The output Series of the supervised task
Returns
----------
Dict
Dictionary of type of anomaly and statistics of y
"""
error_class = '-'
if len(y) < 30:
logging.info("Less than 30 samples")
error_class = "few samples"
elif sum(y) < len(y)/100:
logging.info("Class imbalance> 1/20")
error_class = "few positive instances"
mlflow_tags = {'error_class': error_class,
'n_samples': len(y),
'n_positive': sum(y)}
return mlflow_tags
def train_test_split_and_log(X, y, train_size, random_state, shuffle=True):
""" Performs train / test split and logs the datasets using the
MLflow tracking API
Parameters
----------
X: DataFrame
Input dataset
y: Pandas Series or numpy array
Output vector
train_size: float
Size of the training set as percentage of the whole dataset (0 < train_size <= 1)
random_state: int
Random state for the split
shuffle: bool
Whether to shuffle or not during splitting
Returns
----------
Case 1: Dataframe, Dataframe, Series, Series
X_train, X_test, y_train, y_test
Case 2 (train_size==1): Dataframe, Dataframe, str, str
X_train, X_test, _, _
"""
# Train / Test split
stratify = y if shuffle else None
if train_size < 1:
X_train, X_test, y_train, y_test = \
train_test_split(X,
y,
train_size=train_size,
stratify=stratify,
random_state=random_state,
shuffle=shuffle
)
else:
X_train = X
y_train = y
X_test = "all data was used as training set"
y_test = "all data was used as training set"
# Log datasets
with open('X_train.csv', 'w', encoding='utf-8') as f:
mlflow.log_artifact('X_train.csv')
f.close()
with open('y_train.csv', 'w', encoding='utf-8') as f:
mlflow.log_artifact('y_train.csv')
f.close()
if isinstance(X_test, pd.DataFrame):
with open('X_test.csv', 'w', encoding='utf-8') as f:
mlflow.log_artifact('X_test.csv')
f.close()
with open('y_test.csv', 'w', encoding='utf-8') as f:
mlflow.log_artifact('y_test.csv')
f.close()
return X_train, X_test, y_train, y_test
def mlflow_register_model(model_name):
""" Compares the current model to the production model that is registered
in mlflow models.
Parameters
----------
model_name: str
The model name (same with target name)
Returns
----------
"""
logging.info('\nChecking whether to register model...\n')
client = MlflowClient()
current_run = mlflow.active_run().info.run_id
current_model_data = client.get_run(current_run).data
# lookup for previous version of model
try:
# last model version is the one in production
old_model_properties = dict(
client.search_model_versions(
f"name='{model_name}'"
)[-1])
except IndexError:
old_model_properties = None
# if there is a previous model in production we need to compare and then update or not
if old_model_properties != None and old_model_properties['current_stage'] == 'Production':
logging.info("\n Comparing with previous production model...\n ")
old_model_data = client.get_run(
old_model_properties['run_id']
).data
if old_model_data.metrics['test_f1-macro'] < current_model_data.metrics['test_f1-macro'] \
and ((current_model_data.metrics['test_hl'] >= -0.05) or (current_model_data.metrics['test_hl'] >= old_model_data.metrics['test_hl'])) \
and current_model_data.metrics['test_hl'] <= 0:
# Register new model and transit it to production
new_registered_model = mlflow.register_model(
f"runs:/{current_run}/model",
model_name
)
client.transition_model_version_stage(
name=model_name,
version=new_registered_model.version,
stage="Production"
)
# Archive old model
client.transition_model_version_stage(
name=model_name,
version=str(int(new_registered_model.version) - 1),
stage="Archived")
logging.info('New model is accepted and put to production! \n')
else:
logging.info('New model is rejected. The previous one is kept in production! \n')
else: # in case there is no other model in production immediately register current model
logging.info('\n')
mlflow.register_model(
f"runs:/{current_run}/model",
model_name
)
logging.info('\n')
client.transition_model_version_stage(
name=model_name,
version=1,
stage="Production"
)
return