diff --git a/README.md b/README.md index 01344a1..15169cc 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,19 @@ Reconstruction of WA's Cannabis Data --- -This is a project which I'm using to learn python, mysql, and methods to handle 150M+ lines of raw, TSV data with challenging encoding and types. +CCRS-Python-MySQL is a few python scripts which takes the raw [CCRS](https://lcb.wa.gov/ccrs) .zip download from a public records request and converts it from a rather dirty dataset to a somewhat usable set of csv with up to 1,000,000 lines each. -The repo is currenly in rough shape, so here's what's going on: +I'm using this learn python, mysql, and methods to handle 150M+ lines of raw, TSV data with challenging encoding and types. As I'm learning the code has been refactored to several times but here's my best shot so far. -Use the dataCleanup.py file to process the raw files from TSV, UTF-16-LE with many non-ASCII characters to a list of tuples for use with the most recent sql connection, -StrainsSQLConnect.py. +## How to use +--- + +There are two scripts one which inflates the zip and removes the folder hierarchy `inflate.py` and the other standardizes the row width `columnStandardization.py`. + +Both read files from one directory and then writes to a new directory without compression. The CCRS data in csv form is 23 gb at this time, so be careful with disk space. + +## Known Issues with the scripts and output data +--- +No Compression, slow, and could possibly be changed to write to compression. Data will have issues too, the size of the dataset means that I haven't been able to confirm that my column standardization method is perfect. -Plan on adding these two files together when done debugging. +`InventoryAdjustment_0.csv` seems to have particular encoding issues. diff --git a/SQL/FileMaps.py b/SQL/FileMaps.py new file mode 100644 index 0000000..57950fa --- /dev/null +++ b/SQL/FileMaps.py @@ -0,0 +1,104 @@ +tableName = 'sale_items' +fileMap = { + 'sale_item_id': "int", + 'sale_id' : "int", + 'inventory_item_id' : "int", + 'plant_id' : "int", + 'quantity' : "int", + 'price' : "int", + 'discount' : "int", + 'sales_tax' : "int", + 'other_tax' : "int", + 'external_id' : "varchar", + 'status' : "varchar", + 'created_by' : "varchar", + 'created_date' : "datetime", + 'updated_by' : "varchar", + 'updated_date' : "datetime" +} + +tableName = 'areas' +fileMap = { + 'area_id' : 'int', + 'licensee_id' : 'int', + 'name' : 'varchar', + 'is_quarantine' : 'varchar', + 'external_id' : 'varchar', + 'status' : 'varchar', + 'created_by' : 'varchar', + 'created_date' : 'datetime', + 'updated_by' : 'varchar', + 'updated_date' : 'datetime' +} + +tableName = 'lab_results' +fileMap = { + 'lab_result_id' : "int", + 'lab_licensee_id' : "int", + 'licensee_id' : "int", + 'lab_test_status' : "varchar", + 'inventory_item_id' : "int", + 'test_name' : "varchar", + 'test_date' : "datetime", + 'test_value' : "varchar", + 'external_id' : "varchar", + 'status' : "varchar", + 'created_by' : "varchar", + 'created_date' : "datetime", + 'updated_by' : "varchar", + 'updated_date' : "datetime" +} + +tableName = 'licensees' +fileMap = { + + 'license_status' : "varchar", + 'licensee_id' : "int", + 'UBI' : "varchar", + 'license_number' : "int", + 'name' : "varchar", + 'dba' : "varchar", + 'license_issue_date' : "datetime", + 'license_expiration_date' : "datetime", + 'external_id' : "varchar", + 'status' : "varchar", + 'address_1' : "varchar", + 'address_2' : "varchar", + 'city' : "varchar", + 'state' : "varchar", + 'zip' : "varchar", + 'county' : "varchar", + 'email' : "varchar", + 'phone' : "varchar", + 'created_by' : "varchar", + 'created_date' : "datetime", + 'updated_by' : "varchar", + 'updated_date' : "datetime" +} + +tableName = 'sales' +fileMap = { + 'sale_id' : "varchar", + 'seller_licensee_id' : "varchar", + 'purchaser_licensee_id' : "varchar", + 'sale_type' : "varchar", + 'sale_date' :"datetime", + 'external_id' : "varchar", + 'status' : "varchar", + 'created_by' : "varchar", + 'created_date' : "datetime", + 'updated_by' : "varchar", + 'updated_date' : "datetime" +} + +tableName = 'strains' +fileMap = { + 'strain_id' : 'int', + 'strain_type' : 'varchar', + 'name' : 'varchar', + 'status' : 'varchar', + 'created_by' : 'varchar', + 'created_date' : 'datetime', + 'updated_by' : 'varchar', + 'updated_date' : 'datetime', +} \ No newline at end of file diff --git a/columnStandardization.py b/columnStandardization.py new file mode 100644 index 0000000..7053f7e --- /dev/null +++ b/columnStandardization.py @@ -0,0 +1,95 @@ +import csv +import os +import pandas as pd + +# running this removed old files to save disk space, comment out the os.remove() line to retain them. +src = '/src/path/with/inflated/csvs' +dest = '/dest/path/for/standardized/column/width/csvs' + +df_results = pd.DataFrame(columns=['file', 'columns_removed']) +df_bad_rows = pd.DataFrame(columns=['file', 'row', 'element', 'next_element']) + + +# Dialects for reading in .csv files. +class customDialect(csv.Dialect): + """Describe the usual properties of Excel-generated CSV files.""" + delimiter = '\t' + quotechar = '"' + doublequote = True + skipinitialspace = False + lineterminator = '\n' + quoting = csv.QUOTE_MINIMAL + + +csv.register_dialect("customDialect", customDialect) + +# 'file prefix' : (length of row list, location of field which has delimiters) + +file_types = { + 'Areas_': (10, 2), + 'Contacts_': (13, 0), + 'Integrator_': (7, 1), + 'Inventory_': (16, 1), + 'InventoryAdjustment_': (12, 3), + 'InventoryPlantTransfer_': (17, 0), + 'LabResult_': (14, 6), + 'Plant_': (18, 0), + 'PlantDestructions_': (11, 7), + 'Product_': (12, 3), # probably 4 as well, description. 3 is name + 'SaleHeader_': (11, 5), + 'SalesDetail_': (15, 9), + 'Strains_': (8, 2), + 'Licensee_': (22, 4), #maybe 5 as well, DBA +} + +# since the product and licensee issue cells are adjacent, I can likely do nothing and they would be combined fine. + +for filename in os.listdir(src): + reqColumns = 0 + badLocation = 0 + columns_removed = 0 + for key in file_types: + if filename.startswith(key): + reqColumns = file_types[key][0] + badLocation = file_types[key][1] + f = os.path.join(src, filename) + filename_csv = filename[:-4] + ".csv" + o = os.path.join(dest, filename_csv) + print(f + ' is losing columns') + with open(f, 'rt', encoding='utf-8') as csvFile: + columns_removed = 0 + with open( + o, + "a", + encoding="utf-8", + ) as outputFile: + datareader = csv.reader(csvFile, dialect='customDialect') + datawriter = csv.writer(outputFile) + for row in datareader: + valuesList = [] + for column in row: + valuesList.append(column) + if len(valuesList) < reqColumns: + print(valuesList) + while reqColumns < len(valuesList): + df_bad_row = pd.DataFrame( + [[ + os.path.basename(f), row, valuesList[badLocation], + valuesList[badLocation + 1] + ]], + columns=['file', 'row', 'element', 'next_element']) + valuesList[badLocation] = valuesList[ + badLocation] + valuesList[badLocation + 1] + df_bad_rows = pd.concat([df_bad_rows, df_bad_row]) + del valuesList[badLocation + 1] + columns_removed += 1 + datawriter.writerow(valuesList) + df_result = pd.DataFrame( + [[os.path.basename(f), str(columns_removed)]], + columns=['file', 'columns_removed']) + df_results = pd.concat([df_results, df_result]) + print(o + ' lost ' + str(columns_removed) + ' columns') + os.remove(f) + +df_results.to_csv('df_results.csv', index=False) +df_bad_rows.to_csv('df_bad_rows.csv', index=False) diff --git a/inflate.py b/inflate.py index 36300df..495f28b 100644 --- a/inflate.py +++ b/inflate.py @@ -1,50 +1,50 @@ -import zipfile -import shutil +import csv import os +import codecs +import zipfile -folder = os.getcwd() - -# Open the zip file - -#get list of files -zip_file_list = [f for f in os.listdir(folder) if f.endswith(".zip")] - - -# Extract the csv file from the zip file - -for zip_file in os.listdir(folder) -csv_name = zip_file.namelist()[0] -csv_file = zip_file.open(csv_name) - -# Create a new tsv file -tsv_name = csv_name[:-3] + 'tsv' -tsv_file = open(tsv_name, 'w') - -# Copy the contents of the csv file to the tsv file -shutil.copyfileobj(csv_file, tsv_file) - -# Close the files -csv_file.close() -tsv_file.close() -zip_file.close() - -# Remove the zip file -os.remove(zip_name) - -############################################# - -# From Chat GPT - -############################################# - -def extract_zip(zip_path, dest_dir): - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - for file in zip_ref.namelist(): - # Extract the file to the destination directory - zip_ref.extract(file, dest_dir) - # Get the absolute file path - file_path = os.path.join(dest_dir, file) - # Split the file path into the directory and filename - dirname, filename = os.path.split(file_path) - # Write the file to the destination directory without preserving the hierarchy - shutil.move(file_path, os.path.join(dest_dir, filename)) +src = "/src/folder/with/CCRS/Box/Dowload" +temp = "/temp/folder/for/intermediate/file/storage" +dest = "/dest/folder/for/all/inflated/files" + + +# Dialects for reading in .csv files. +class customDialect(csv.Dialect): + """Describe the usual properties of Excel-generated CSV files.""" + delimiter = '\t' + quotechar = '"' + doublequote = True + skipinitialspace = False + lineterminator = '\n' + quoting = csv.QUOTE_MINIMAL + + +csv.register_dialect("customDialect", customDialect) + +# walk through all sub directories in a CCRS Box Download .zip, read the src as a utf-16-le and output a utf-8 file. +# temp os a temporary location + +write_count = 0 +for root, dirs, files in os.walk(src): + for file in files: + if file.endswith(".zip"): + with zipfile.ZipFile(os.path.join(root, file), 'r') as zip_ref: + zip_ref.extractall(temp) + extracted_file = zip_ref.namelist()[0] + extracted_file_base = os.path.basename(extracted_file) + extracted_file_path = os.path.join(temp, extracted_file) + with codecs.open(extracted_file_path, + encoding='utf-16-le', + errors='replace') as source_file: + dest_file_base = extracted_file_base.rsplit('\\', 1)[-1] + dest_file_base = dest_file_base[:-4] + ".tsv" + dest_file_path = os.path.join(dest, dest_file_base) + with codecs.open(dest_file_path, 'w', + encoding='utf-8') as dest_file: + for row in source_file: + row = ''.join( + [c if ord(c) < 128 else '' for c in row]) + dest_file.write(row) + write_count += 1 + os.remove(extracted_file_path) +print('files written: ' + str(write_count))