-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdu.py
88 lines (79 loc) · 2.7 KB
/
du.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from typing import List
from loader import Inventory
DU_STMT = """
select
common_prefix,
sum(size) as size
from (
select
key,
CASE cardinality(split(substr(key, length('{prefix}') + 1), '{delimiter}'))
WHEN 1 THEN split_part(substr(key, length('{prefix}') + 1), '{delimiter}', 1)
ELSE split_part(substr(key, length('{prefix}') + 1), '{delimiter}', 1) || '{delimiter}'
END as common_prefix,
size
from {table_name}
where
dt = '{date}'
and key like '{prefix}%'
and (is_latest = true or is_latest is null)
and (is_delete_marker = false or is_delete_marker is null)
)
group by common_prefix
order by size desc
"""
DU_DIFF_STMT = """
with one_day as (select
common_prefix,
sum(size) as total,
dt
from (
select
key,
CASE cardinality(split(substr(key, length('{prefix}') + 1), '{delimiter}'))
WHEN 1 THEN split_part(substr(key, length('{prefix}') + 1), '{delimiter}', 1)
ELSE split_part(substr(key, length('{prefix}') + 1), '{delimiter}', 1) || '{delimiter}'
END as common_prefix,
size,dt
from {table_name}
where
key like '{prefix}%'
and (is_latest = true or is_latest is null)
and (is_delete_marker = false or is_delete_marker is null)
)
group by common_prefix,dt)
select
common_prefix,
d1.total as size_left,
d2.total as size_right,
d1.total - d2.total as diff
from
(select * from one_day where dt='{date_left}') d1 full outer join
(select * from one_day where dt='{date_right}') d2 using (common_prefix)
order by
abs(diff) desc,
size_left desc,
size_right desc
"""
def size_for(db: Inventory, date: str, prefix: str = '', delimiter: str = '/') -> List[dict]:
results = db.query(DU_STMT, prefix=prefix, delimiter=delimiter, date=date)
return [{'common_prefix': r.get('common_prefix'), 'size': int(r.get('size'))} for r in results]
def diff_for(db: Inventory, date_left: str, date_right: str, prefix: str = '', delimiter: str = '/') -> List[dict]:
results = db.query(DU_DIFF_STMT, prefix=prefix, delimiter=delimiter, date_left=date_left, date_right=date_right)
adjusted = []
for r in results:
size_left = r.get('size_left')
size_right = r.get('size_right')
if not size_right:
diff = size_left
elif not size_left:
diff = f'-{size_right}'
else:
diff = int(r.get('diff') or '0')
adjusted.append({
'common_prefix': r.get('common_prefix'),
'size_left': int(r.get('size_left') or '0'),
'size_right': int(r.get('size_right') or '0'),
'diff': diff,
})
return adjusted