-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
202 lines (161 loc) · 7.62 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""This module encapsulates utilities for all projects"""
from math import log
import os
import pandas as pd
IRIS_COLS = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']
IRIS_ATTRIBUTES = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
IRIS_TARGET = 'species'
IRIS_NAME = 'iris'
CONGRESS_COLS = ['class', *[f'c{n}' for n in range(1, 17)]]
CONGRESS_TARGET = 'class'
CONGRESS_ATTRIBUTES = [f'c{n}' for n in range(1, 17)]
CONGRESS_NAME = 'congress'
WINE_COLS = ["Class", "Alcohol", "Malic acid",
"Ash", "Alcalinity of ash",
"Magnesium", "Total phenols", "Flavanoids",
"Nonflavanoid phenols", "Proanthocyanins",
"Color intensity", "Hue", "OD280/OD315 of diluted wines", "Proline"]
WINE_TARGET = 'Class'
WINE_ATTRIBUTES = [attr for attr in WINE_COLS if attr != WINE_TARGET]
WINE_NAME = 'wine'
BREAST_TARGET = 'diagnosis'
BREAST_NAME = 'breast'
def get_data(name: str, cols=None) -> pd.DataFrame:
return pd.read_csv(os.path.join(os.getcwd(), '..', 'input_data', name),
names=cols)
def split_train_test(dataframe: pd.DataFrame, fraction: float = 0.7) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Splits a Pandas DataFrame into train and test sets.
Args:
dataframe: The Pandas DataFrame to split.
fraction: The fraction of the data to use for the train set.
Returns:
A tuple containing the train and test DataFrames.
"""
num_train = int(len(dataframe) * fraction)
train_df = dataframe.iloc[:num_train].copy()
test_df = dataframe.iloc[num_train:].copy()
return train_df, test_df
def split_train_validation_test(dataframe: pd.DataFrame,
fraction_train: float = 0.7,
fraction_validation: float = 0.15) -> tuple:
"""
Splits a Pandas DataFrame into train and test sets.
Args:
dataframe: The Pandas DataFrame to split.
fraction_train: The fraction of the data to use for the train set.
fraction_validation: The fraction of the data to use for the validation set.
Returns:
A tuple containing the train and test DataFrames.
"""
num_train = int(len(dataframe) * fraction_train)
num_validate = int(len(dataframe) * (fraction_train + fraction_validation))
train_df = dataframe.iloc[:num_train].copy()
validate_df = dataframe.iloc[num_train:num_validate].copy()
test_df = dataframe.iloc[num_validate:].copy()
return train_df, validate_df, test_df
def shuffle_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
return dataframe.sample(len(dataframe))
def get_column_max_gain(data_frame: pd.DataFrame, target_col: str, attributes: list[str]) -> str:
""" Given data_frame, selects the column with the max info gain"""
entropy_general = entropy(data_frame, target_col)
instances = data_frame.shape[0]
attributes_gains = {}
for attribute in attributes:
attr_value = entropy_general
for value in data_frame[attribute].unique():
new_frame = data_frame.loc[data_frame[attribute] == value]
attr_value -= (new_frame.shape[0] / instances) * entropy(new_frame, target_col)
attributes_gains[attribute] = attr_value
return sorted(attributes_gains.items(),
key=lambda x: x[1],
reverse=True)[0][0]
def entropy(data: pd.DataFrame, target_col: str) -> float:
"""
Calculates the entropy of a dataset based on the distribution of a target variable.
Args:
data (pd.DataFrame): The input dataset.
target_col (str): The name of the target column.
Returns:
float: The entropy of the dataset.
"""
counts = data[target_col].value_counts()
total = counts.sum()
entropy_value = 0
for count in counts:
prob = count / total
entropy_value += -prob * log(prob, 2)
return entropy_value
def get_columns_gain(df, target_col, non_target_cols):
entropy_before = entropy(df, target_col)
ig_dict = {}
for col in non_target_cols:
ig_dict[col] = entropy_before - calculate_weighted_entropy(df, col, target_col)
return ig_dict
def calculate_weighted_entropy(df, col, target_col):
total_count = df.shape[0]
value_counts = df[col].value_counts(normalize=True)
probabilities = value_counts.to_dict()
entropy_list = []
for value in df[col].unique():
subset = df[df[col] == value]
val = entropy(subset, target_col)
entropy_list.append(probabilities[value] * val)
return sum(entropy_list) * total_count / df.shape[0]
def information_gain_col(data: pd.DataFrame, target_col: str, feature_col: str) -> float:
total_entropy = entropy(data, target_col)
feature_values = data[feature_col].unique()
feature_entropy = 0
for value in feature_values:
subset_indices = data.index[data[feature_col] == value]
subset_data = data.loc[subset_indices]
subset_entropy = entropy(subset_data, target_col)
prob = len(subset_indices) / len(data)
feature_entropy += prob * subset_entropy
return total_entropy - feature_entropy
def replace_nulls(data_frame: pd.DataFrame, target_col: str, default_null='?'):
"""Replaces nulls in dataset using most common value in class """
non_class_columns = [col for col in data_frame.columns if col != target_col]
for column in non_class_columns:
for value in data_frame[target_col].unique():
most_common_value = data_frame[data_frame[target_col] == value][column].mode()[0]
data_frame.loc[
(data_frame[column] == default_null) &
(data_frame[target_col] == value), column
] = most_common_value
return data_frame
def discretize_info_gain(dataframe: pd.DataFrame, target_col: str) -> pd.DataFrame:
"""
Discretizes all continuous attributes in dataframe based on info gain for each column
:param dataframe: target_df
:param target_col: target column of dataframe
:returns: dataframe with discretized values
"""
dataframe_copy = dataframe.copy()
numeric_cols = dataframe_copy.select_dtypes(include=['number']).columns.tolist()
numeric_cols = [col for col in numeric_cols if col != target_col]
num_bins = len(dataframe[target_col].unique()) - 1
total_instances = dataframe_copy.shape[0]
for col in numeric_cols:
df = dataframe_copy[[col, target_col]].copy()
df = df.sort_values([col, target_col], ascending=True)
thresholds = set(df.loc[df[target_col].ne(df[target_col].shift())][col][1:].tolist())
thresholds_mapping = {}
if len(thresholds) == num_bins:
pass
for threshold in thresholds:
left, right = df.loc[df[col] <= threshold], df.loc[df[col] > threshold]
inf_gain = entropy(df, target_col) - ((len(left) / total_instances) * entropy(left, target_col)
+ (len(right) / total_instances) * entropy(right, target_col))
thresholds_mapping[threshold] = inf_gain
best_thresholds = sorted(thresholds_mapping.items(),
key=lambda x: x[1],
reverse=True)[:num_bins]
prev_threshold = min(df[col])
max_threshold = max(df[col])
for threshold, _ in best_thresholds:
dataframe_copy.loc[(dataframe[col] < threshold) &
(dataframe[col] >= prev_threshold), col] = f'{prev_threshold}-{threshold}'
prev_threshold = threshold
dataframe_copy.loc[dataframe[col] >= prev_threshold, col] = f'{prev_threshold}-{max_threshold}'
return dataframe_copy