-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcompiler.py
137 lines (128 loc) · 5.44 KB
/
compiler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import csv
import glob
import json
import os
import logging
import gzip
from bdbag import bdbag_api
from jsonpath_rw import jsonpath, parse
from pyld import jsonld
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)
class Column:
""" Model a table column. """
def __init__(self, name, column_type=None):
self.name = name
self.type = column_type
def __repr__(self):
return "{0} {1}".format (self.name, self.type)
class DataSet:
""" A set of columns and associated metadata. """
def __init__(self, db_path, columns):
self.name = db_path.replace (".sqlitedb", "")
self.db_path = db_path
self.columns = columns
self.operations = []
self.jsonld_context = {}
self.example_rows = []
def __repr__(self):
return "{0} {1} {2}".format (self.name, self.db_path, self.columns)
class CSVFilter:
""" Implement data set specific filters. Generalize. """
def filter_data (self, f):
basename = os.path.basename (f)
if basename.startswith ("CTD_") or basename.lower().startswith ("bicl"):
with open (f, "r") as stream:
f_new = "{0}.new".format (f)
with open (f_new, "w") as new_stream:
headers_next = False
index = 0
for line in stream:
index = index + 1
out_line = line
if line.startswith ("#"):
if headers_next:
out_line = line.replace ("# ", "")
headers_next = False
elif line.strip() == "# Fields:":
out_line = None
headers_next = True
else:
out_line = None
if out_line:
new_stream.write (out_line)
os.rename (f_new, f)
class BagCompiler:
""" Demarcates generic concepts relating to compiling a bag. """
def __init__(self, bag_archive, output_path="out", generated_path="gen"):
self.generated_path = generated_path
if not os.path.exists (self.generated_path):
os.makedirs (self.generated_path)
""" Parse bag manifest. """
self.manifest = self.parse (
bag_archive=bag_archive,
output_path=output_path)
self.options = None
def get_options(self, options_path):
options = {}
if options_path:
with open(options_path, "r") as stream:
options = json.loads(stream.read ())
return options
def compile (self, options_path):
""" Load options. """
self.options = self.get_options (options_path)
def parse (self, bag_archive, output_path="out"):
""" Analyze the bag, consuming BagIt-RO metadata into a structure downstream code emitters can use. """
manifest = {}
""" Extract the bag. """
bag_path = bdbag_api.extract_bag (bag_archive, output_path=output_path)
if bdbag_api.is_bag(bag_path):
logger.debug ("Initializing metadata datasets")
manifest['path'] = bag_path
manifest['datasets'] = {}
datasets = manifest['datasets']
data_path = os.path.join (bag_path, "data")
""" Extract tarred files. """
tar_data_files = glob.glob (os.path.join (data_path, "*.csv.gz"))
for f in tar_data_files:
with gzip.open(f, 'rb') as zipped:
extracted = f.replace (".gz", "")
with open (extracted, "wb") as stream:
file_content = zipped.read ()
stream.write (file_content)
""" Collect metadata for each file. """
data_files = glob.glob (os.path.join (data_path, "*.csv"))
csv_filter = CSVFilter ()
for f in data_files:
csv_filter.filter_data (f)
logger.debug (f" --collecting metadata for: {f}")
jsonld_context = self._get_jsonld_context (f)
datasets[f] = jsonld_context
print (f"json-ld: {json.dumps(jsonld_context, indent=2)}")
context = datasets[f]['@context']
datasets[f]['columns'] = {
k : None for k in context if isinstance(context[k],dict)
}
return manifest
def _get_jsonld_context (self, data_file):
jsonld = None
ro_model_path = data_file.split (os.path.sep)
ro_model_path.insert (-1, os.path.sep.join ([ '..', 'metadata', 'annotations' ]))
ro_model_path = os.path.sep.join (ro_model_path)
jsonld_context_files = [
"{0}.jsonld".format (data_file),
"{0}.jsonld".format (ro_model_path)
]
for jsonld_context_file in jsonld_context_files:
print ("testing {}".format (jsonld_context_file))
if os.path.exists (jsonld_context_file):
print ("opening {}".format (jsonld_context_file))
with open (jsonld_context_file, "r") as stream:
jsonld = json.loads (stream.read ())
break
if not jsonld:
raise ValueError (f"Unable to find JSON-LD file for {data_file}")
return jsonld
def cleanup_bag (bag_path):
bdbag_api.cleanup_bag (os.path.dirname (bag_path))