-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathts_alphatest.py
145 lines (131 loc) · 4.4 KB
/
ts_alphatest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
@File : alphatest.py
@Time : 2024/11/28 16:06:52
@Author : David Jin
@Version : 1.0
@Contact : jinxyyy@qq.com
@License : (C)Copyright 2024-2024, David Jin
@Desc : 对单因子进行IC测试
"""
import numba as nb
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from AlphaEngine.operators import numba as op
def ic(
X: pd.DataFrame,
y: pd.DataFrame,
forward: int = 1,
method: str = "pearson",
aggsize: str = "Y",
period: int = 252,
) -> pd.Series:
"""
时序IC计算
Args:
X (pd.DataFrame): 特征矩阵
y (pd.DataFrame): 标签矩阵
shift (int, optional): 滞后期数. Defaults to 1.
method (str, optional): 相关系数计算方法. Defaults to 'pearson'.
aggsize (str, optional): 聚合周期. Defaults to 'Y'.
period (int, optional): 滚动标准化窗口大小. Defaults to 252.
Returns:
pd.Series: IC值表格
"""
forward += 1
X = X.copy()
y = y.copy()
X = _preprocess(X, forward, period)
# 选择聚合周期
if aggsize == "Y":
X["group"] = X.index.year
y["group"] = y.index.year
elif aggsize == "M":
X["group"] = X.index.month
y["group"] = y.index.month
elif aggsize == "D":
X["group"] = X.index.day
y["group"] = y.index.day
else:
raise ValueError("aggsize must be 'Y', 'M' or 'D'")
# 计算IC
if method == "pearson":
ic = pd.DataFrame(
_ts_corr_pearson(X.to_numpy(), y.to_numpy()),
index=X["group"].unique(),
columns=X.columns[:-1],
)
elif method == "spearman":
ic = pd.DataFrame(
_ts_corr_spearman(X.to_numpy(), y.to_numpy()),
index=X["group"].unique(),
columns=X.columns[:-1],
)
else:
raise ValueError("method must be 'pearson' or 'spearman'")
return ic
def _preprocess(X: pd.DataFrame, forward: int, p: int) -> pd.DataFrame:
"""数据预处理"""
return op.ts_delay(X, forward)
@nb.njit(nogil=True, cache=True, parallel=True)
def _ts_corr_pearson(X: np.ndarray, Y: np.ndarray) -> np.ndarray:
group = np.unique(X[:, -1])
N = X.shape[1] - 1
T = len(group)
ic = np.full((T, N), np.nan)
for col in nb.prange(N):
for t in range(T):
x_array = X[X[:, -1] == group[t], col]
y_array = Y[Y[:, -1] == group[t], col]
valid = np.isfinite(x_array) & np.isfinite(y_array)
if np.sum(valid) > 2:
x_valid = x_array[valid]
y_valid = y_array[valid]
x_mean = np.mean(x_valid)
y_mean = np.mean(y_valid)
x_std = np.std(x_valid)
y_std = np.std(y_valid)
cov = np.mean((x_valid - x_mean) * (y_valid - y_mean))
ic[t, col] = cov / (x_std * y_std)
return ic
@nb.njit(nogil=True, cache=True)
def _avg_rank(data: np.ndarray) -> np.ndarray:
"""计算平均排名"""
n = len(data)
ranks = np.zeros(n, dtype=np.float64)
sorted_indices = np.argsort(data)
# 记录相等元素的起始位置
start = 0
while start < n:
end = start
while end < n and data[sorted_indices[end]] == data[sorted_indices[start]]:
end += 1
# 计算该组的平均排名
avg_rank = (start + 1 + end) / 2
for j in range(start, end):
ranks[sorted_indices[j]] = avg_rank
start = end
return ranks
@nb.njit(nogil=True, cache=True, parallel=True)
def _ts_corr_spearman(X: np.ndarray, Y: np.ndarray) -> np.ndarray:
group = np.unique(X[:, -1])
N = X.shape[1] - 1
T = len(group)
ic = np.full((T, N), np.nan)
for col in nb.prange(N):
for t in range(T):
x_array = X[X[:, -1] == group[t], col]
y_array = Y[Y[:, -1] == group[t], col]
valid = np.isfinite(x_array) & np.isfinite(y_array)
if np.sum(valid) > 2:
x_valid = x_array[valid]
y_valid = y_array[valid]
x_rank = _avg_rank(x_valid)
y_rank = _avg_rank(y_valid)
x_mean = np.mean(x_rank)
y_mean = np.mean(y_rank)
x_std = np.std(x_rank)
y_std = np.std(y_rank)
cov = np.mean((x_rank - x_mean) * (y_rank - y_mean))
ic[t, col] = cov / (x_std * y_std)
return ic