-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsabir-train
executable file
·281 lines (246 loc) · 9.8 KB
/
sabir-train
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
#!/usr/bin/env python3
import os, sys, math, unicodedata, math
from collections import *
from ctypes import *
# Length of a document for training, in code points, or in bytes if BYTE_NGRAMS
# (see below) is true. This only includes characters that are effectively used
# in ngrams, not whitespace, etc.
DOCUMENT_LEN = 30
# Maximum number of documents to use for training, per language. We discard
# remaining data if need be.
MAX_DOCUMENTS = 100000
NGRAM_SIZE = 4
NUM_FOLDS = 10 # For cross-validation. We start counting at 1.
# Trainers:
# * dictionary: Use a real dictionary for storing features.
# * vector: Hash features and store them into a single array. Entails hashing
# collisions.
# * multi_vector: like "vector", but use one array per language.
# We lose 0.1 accuracy with "vector" due to collisions, which is not much.
# The accuracy of "multi_vector" is much worse than the one of "vector".
TRAINER = "vector"
LOWERCASE_CHUNK = False # Negative effect (not by much).
BYTE_NGRAMS = True # Positive
STRIP_NON_ALNUM = True # Positive
STRIP_NUMERIC = True # Positive
PAD_CHUNK = True # Positive, important
assert NGRAM_SIZE <= DOCUMENT_LEN
def utf8(x):
return x.encode("UTF-8")
def normalize_chunk(chunk):
if LOWERCASE_CHUNK:
chunk = chunk.lower()
if STRIP_NON_ALNUM:
chunk = "".join(c.isalnum() and c or " " for c in chunk)
if STRIP_NUMERIC:
chunk = "".join(c.isnumeric() and " " or c for c in chunk)
chunks = chunk.split()
if BYTE_NGRAMS:
chunks = (utf8(chunk) for chunk in chunks)
if PAD_CHUNK:
if BYTE_NGRAMS:
# 0xff is not valid in UTF-8.
chunks = (b"\xff" + chunk + b"\xff" for chunk in chunks)
else:
# Surrogates cannot be found in UTF-8-encoded strings.
chunks = ("\ud800" + chunk + "\ud800" for chunk in chunks)
return (chunk for chunk in chunks if len(chunk) >= NGRAM_SIZE)
def split_ngrams(text):
return (text[i:i + NGRAM_SIZE] for i in range(len(text) - NGRAM_SIZE + 1))
def iter_ngrams(file):
fp = isinstance(file, str) and open(file) or file
for line in fp:
line = line.strip()
if not line:
continue
line = unicodedata.normalize("NFC", line)
for chunk in line.split():
for sub_chunk in normalize_chunk(chunk):
yield from split_ngrams(sub_chunk)
if fp is not file:
fp.close()
def iter_corpus(path, lang, max_documents=MAX_DOCUMENTS):
def closure(fold_no=0, test_docs=None):
# If "fold_no" is > 0, we reserve some documents for testing while reading
# the corpus, by adding them to "test_docs", which must be a dictionary:
# {"lang1": [], "lang2": []...}
document = []
rotation = 0
num_docs = 0
for ngram in iter_ngrams(path):
document.append(ngram)
if len(document) + NGRAM_SIZE - 1 < DOCUMENT_LEN:
continue
rotation += 1
num_docs += 1
if fold_no == rotation:
test_docs[lang].append(document)
else:
yield document
if num_docs == max_documents:
return
if rotation == NUM_FOLDS:
rotation = 0
document = []
return closure
# Emulate C unsigned overflow.
U32 = lambda n: c_uint32(n).value
def hash_feature(ngram, lang_no):
# Must match the function used in the C source file!
h = 1315423911
for c in ngram + bytes([lang_no]):
n = U32(U32(h << 5) + c + U32(h >> 2))
h = U32(h ^ n)
return h
# When several labels have the same probabilities, choose the smaller one,
# lexicographically speaking (must maintain stability for testing).
def find_best(probs):
best = max(probs, key=probs.get)
for lang in sorted(probs):
if probs[lang] == probs[best]:
return lang
def train_dictionary(corpora, fold_no=0, test_docs=None):
class_fd = defaultdict(int)
cond_fd = defaultdict(int)
ngrams_frq = defaultdict(int)
ngrams_set = set()
for lang, itor in corpora.items():
for document in itor(fold_no, test_docs):
class_fd[lang] += 1
for ngram in document:
cond_fd[ngram, lang] += 1
ngrams_set.add(ngram)
ngrams_frq[lang] += 1
def classify(document):
probs = {}
# Should have:
# for lang, num_docs in class_fd.items():
# probs[lang] = math.log(num_docs / sum(class_fd.values()))
# But we assume that priors are uniform.
for ngram in document:
for lang in class_fd:
cond_frq = cond_fd.get((ngram, lang), 0)
frq = ngrams_frq.get(lang, 0)
# The real formula is:
# math.log((cond_frq + 1) / (frq + len(ngrams_set)))
# But we assume that all documents have the same length.
probs[lang] += math.log(cond_frq + 1)
return find_best(probs), []
langs = sorted(class_fd)
return classify, cond_fd, langs
def train_vector(corpora, fold_no=0, test_docs=None):
_, cond_fd, langs = train_dictionary(corpora, fold_no, test_docs)
cond_frq_vec = [0] * (2 ** math.ceil(math.log(len(cond_fd),2)))
for (ngram, lang), freq in cond_fd.items():
idx = hash_feature(ngram, langs.index(lang)) % len(cond_frq_vec)
cond_frq_vec[idx] += freq
def classify(document):
probs = {lang: 0. for lang in langs}
infos = [] # [(ngram, lang, hash, prob), ..]
for ngram in document:
for lang in langs:
h = hash_feature(ngram, langs.index(lang))
cond_frq = cond_frq_vec[h % len(cond_frq_vec)]
prob = math.log(cond_frq+1)
probs[lang] += prob
infos.append((ngram, lang, h, prob))
return find_best(probs), infos
return classify, cond_frq_vec, langs
def train_multi_vector(corpora, fold_no=0, test_docs=None):
_, cond_fd, langs = train_dictionary(corpora, fold_no, test_docs)
ngrams_per_lang = Counter(lang for (ngram, lang) in cond_fd)
cond_frq_vecs = {lang: [0] * num_ngrams for lang, num_ngrams in ngrams_per_lang.items()}
for (ngram, lang), freq in cond_fd.items():
cond_frq_vecs[lang][hash_feature(ngram, langs.index(lang)) % ngrams_per_lang[lang]] += freq
def classify(document):
probs = {lang: 0. for lang in langs}
for ngram in document:
for lang in langs:
cond_frq = cond_frq_vecs[lang][hash_feature(ngram, langs.index(lang)) % ngrams_per_lang[lang]]
probs[lang] += math.log(cond_frq+1)
return find_best(probs), []
return classify, cond_frq_vecs, langs
TRAINERS = {
"dictionary": train_dictionary,
"vector": train_vector,
"multi_vector": train_multi_vector,
}
def load_corpora(paths):
# We use a single byte for hashing language name into features.
assert len(paths) <= 0xff, "too much languages"
# Count the number of documents per corpus so that we can truncate all
# corpora to the same number of documents if one of them turns out to be too
# small. Needed because we assume that priors are uniform.
max_docs = MAX_DOCUMENTS
for corpus in paths:
itor = iter_corpus(corpus, "foobar")
num_docs = 0
for doc in itor():
num_docs += 1
max_docs = min(max_docs, num_docs)
# Now create iterators that will stop at the right number of documents.
corpora = {}
for corpus in paths:
lang = os.path.splitext(os.path.basename(corpus))[0]
assert not lang in corpora
corpora[lang] = iter_corpus(corpus, lang, max_docs)
return corpora
def run_eval(corpora):
conf_mat = {lang: {"tp": 0, "fp": 0, "fn": 0}.copy() for lang in corpora}
for fold_no in range(1, NUM_FOLDS + 1):
print("*** %d/%d" % (fold_no, NUM_FOLDS), file=sys.stderr)
test_docs = defaultdict(list)
classifier, _, _ = TRAINERS[TRAINER](corpora, fold_no, test_docs)
for lang, documents in test_docs.items():
for doc in documents:
guess, _ = classifier(doc)
if guess == lang:
conf_mat[lang]["tp"] += 1
else:
conf_mat[guess]["fp"] += 1
conf_mat[lang]["fn"] += 1
precision = sum(mat["tp"] / (mat["tp"] + mat["fp"]) for mat in conf_mat.values()) / len(conf_mat)
recall = sum(mat["tp"] / (mat["tp"] + mat["fn"]) for mat in conf_mat.values()) / len(conf_mat)
F1 = 2 * precision * recall / (precision + recall)
print("macro-precision: %.3f" % (precision * 100))
print("macro-recall: %.3f" % (recall * 100))
print("macro-F1: %.3f" % (F1 * 100))
def mkmodel(corpora, fp=sys.stdout):
_, features, langs = train_vector(corpora)
print("@ sabir 1", file=fp)
print("> %d %d %d" % (len(langs), sum(len(utf8(lang)) for lang in langs), len(features)), file=fp)
for lang in sorted(langs):
print(lang, file=fp)
for f in features:
print(f, file=fp)
USAGE = """\
Usage: %s <command> <text_file> [<text_file>..]
Train a language detection model for Sabir.
Commands:
dump
Train a model and write it on the standard output. The generated model can
then be used with the corresponding C library.
eval
Train a model, test its accuracy using cross-validation, and display a
performance summary on the standard output.
Arguments after the command must be a list of text files to use for training.
There should be one file per language. The name of the language corresponding to
a file is derived from the file path by stripping its extension and its leading
directories, e.g.:
/home/foobar/en.txt -> en
"""
def usage(ret):
fp = ret == 0 and sys.stdout or sys.stderr
print(USAGE.strip() % os.path.basename(sys.argv[0]), file=fp)
exit(ret)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] in ("-h", "--help"):
usage(0)
if len(sys.argv) < 3:
usage(1)
if sys.argv[1] == "dump":
mkmodel(load_corpora(sys.argv[2:]))
elif sys.argv[1] == "eval":
run_eval(load_corpora(sys.argv[2:]))
else:
usage(1)