-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.py
162 lines (142 loc) · 6.52 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import csv
from common import TippeData, Scraper, Team, Contestant
import os
class CsvReader24():
def __init__(self, filename: str, teams, debug=False):
self.debug = debug
self.csv = filename
self.team_names = [t.name for t in teams]
self.n_teams = len(self.team_names)
if not os.path.exists(filename):
raise FileNotFoundError(f"The file {filename} does not exist.")
def get_team_entry(self, team_name, round_number, input_fname=""):
if input_fname == "":
input_fname = self.csv
with open(input_fname, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # Skip header row
for row in reader:
if row[0] == team_name and int(row[1]) == round_number:
if row[2] == "-":
print(f"WARN: entry for {team_name} round {round_number} is empty")
return row
print(f"could not get entry for {team_name} round {round_number}")
return None
# n_matches for a specific team
def get_n_matches_played(self, team_name, input_fname=""):
n_matches_played = 0
if input_fname == "":
input_fname = self.csv
with open(input_fname, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # Skip header row
for row in reader:
if row[0] == team_name and row[2] != '-':
n_matches_played += 1
if self.debug:
print(f"{team_name} has {n_matches_played} matches saved")
return n_matches_played
def get_min_matches_played(self, input_fname=""):
print("checking number of rounds played...")
n_min = 2*(self.n_teams-1)
for name in self.team_names:
n_played = self.get_n_matches_played(name, input_fname)
n_min = min(n_min, n_played)
print(f"->all teams have played at least {n_min} rounds")
return n_min
# checks file, and sees how many "pos" rows have been written via fcn write_team_pos()
# i.e. how many standings
def get_n_pos_rows_written(self, input_fname=""):
n_rows = None
if input_fname == "":
input_fname = self.csv
with open(input_fname, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader)
current_team = None
current_n_rows = 0
for row in reader:
team = row[0]
pos = row[4]
# every time we reach a new team
if team != current_team:
# for all teams except first
if current_team is not None:
#if self.debug:
#print(f"{current_team} has {current_n_rows} 'pos' rows saved")
if n_rows is None:
n_rows = current_n_rows
elif n_rows != current_n_rows:
print(f"ERROR! {current_team} has {current_n_rows} but previous team had {n_rows}!")
current_n_rows = 0
current_team = team
# check if pos has been updated
if pos.isdigit():
current_n_rows += 1
# Check the last team's 'pos' count
if current_n_rows != n_rows and n_rows is not None:
print(f"ERROR! {current_team} has {current_n_rows} 'pos' rows, but expected {n_rows}!")
if self.debug and n_rows is not None:
print(f"->{n_rows} 'pos' rows already written for each team")
return n_rows if n_rows is not None else 0
# get sorted list of standings at round number
def get_simple_standings_at_round_number(self, round_number, input_fname=""):
round_standings = []
if input_fname == "":
input_fname = self.csv
with open(input_fname, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # skip header
for row in reader:
if int(row[1]) == round_number:
if row[4].isdigit():
round_standings.append([row[0], int(row[4])])
else:
raise ValueError(f"ERROR: round {round_number} not found for team {row[0]}")
# sort and return
round_standings.sort(key=lambda x: (x[1]))
return round_standings
def write_team_entry(self, team_name, round_number, new_points, new_gd,
output_fname="", input_fname=""):
updated_rows = []
new_row = None
if input_fname == "":
input_fname = self.csv
with open(input_fname, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if row[0] == team_name and int(row[1]) == round_number:
row[2] = str(new_points) # Update points
row[3] = str(new_gd) # Update goal difference
new_row = row
updated_rows.append(row)
if new_row is None:
raise ValueError(f"ERROR: did not find row {round_number} for {team_name}!")
# write
if output_fname == "":
output_fname = self.csv
with open(output_fname, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(updated_rows)
print(f"->wrote round {round_number} for {team_name}: {new_row}\n")
# updates the "pos" column of a team
def write_team_pos(self, team_name, round_number, new_position, output_fname="", input_fname=""):
updated_rows = []
if input_fname == "":
input_fname = self.csv
with open(input_fname, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if row[0] == team_name and int(row[1]) == round_number:
if row[4] != "-":
raise ValueError(f"ERROR: {team_name} pos already set")
row[4] = new_position
updated_rows.append(row)
# write
if output_fname == "":
output_fname = self.csv
with open(output_fname, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(updated_rows)
print(f"->wrote pos at round {round_number} for {team_name}\n")
return True