From 7597fded53a8d0acec943c3e91870338614fe79c Mon Sep 17 00:00:00 2001 From: garciam Date: Mon, 30 Dec 2024 10:50:25 +0100 Subject: [PATCH] encode variables in batch_to_netcdf --- cdsobs/ingestion/serialize.py | 44 +++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/cdsobs/ingestion/serialize.py b/cdsobs/ingestion/serialize.py index 6a4f7dd..7c05be1 100644 --- a/cdsobs/ingestion/serialize.py +++ b/cdsobs/ingestion/serialize.py @@ -131,24 +131,10 @@ def to_netcdf( # Encode variable names as integer if encode_variables: logger.info("Encoding observed variables using the CDM variable codes.") - code_table = cdm_dataset.dataset_params.cdm_code_tables[ - "observed_variable" - ].table - # strip to remove extra spaces - var2code = get_var2code(code_table) - encoded_data = ( - cdm_dataset.dataset["observed_variable"] - .str.encode("UTF-8") - .map(var2code) - .astype("uint8") - ) + cdm_code_tables = cdm_dataset.dataset_params.cdm_code_tables + data = cdm_dataset.dataset + encoded_data, var2code_subset = encode_observed_variables(cdm_code_tables, data) cdm_dataset.dataset["observed_variable"] = encoded_data - codes_in_data = encoded_data.unique() - var2code_subset = { - var.decode("ascii"): code - for var, code in var2code.items() - if code in codes_in_data - } encoding["observed_variable"]["dtype"] = encoded_data.dtype attrs["observed_variable"] = dict( labels=list(var2code_subset), codes=list(var2code_subset.values()) @@ -166,6 +152,22 @@ def to_netcdf( return output_path +def encode_observed_variables(cdm_code_tables, data): + code_table = cdm_code_tables["observed_variable"].table + # strip to remove extra spaces + var2code = get_var2code(code_table) + encoded_data = ( + data["observed_variable"].str.encode("UTF-8").map(var2code).astype("uint8") + ) + codes_in_data = encoded_data.unique() + var2code_subset = { + var.decode("ascii"): code + for var, code in var2code.items() + if code in codes_in_data + } + return encoded_data, var2code_subset + + def get_var2code(code_table): code_dict = pandas.Series( index=code_table["name"].str.strip().str.replace(" ", "_").str.encode("ascii"), @@ -280,15 +282,23 @@ def batch_to_netcdf( for field in homogenised_data: if homogenised_data[field].dtype == "string": homogenised_data[field] = homogenised_data[field].str.encode("UTF-8") + encoded_data, var2code_subset = encode_observed_variables( + dataset_params.cdm_code_tables, homogenised_data + ) + homogenised_data["observed_variable"] = encoded_data homogenised_data_xr = homogenised_data.to_xarray() if service_definition.global_attributes is not None: homogenised_data.attrs = { **homogenised_data.attrs, **service_definition.global_attributes, } + homogenised_data_xr["observed_variable"].attrs = dict( + labels=list(var2code_subset), codes=list(var2code_subset.values()) + ) encoding = get_encoding_with_compression_xarray( homogenised_data_xr, string_transform="str_to_char" ) + encoding["observed_variable"]["dtype"] = str(encoded_data.dtype) logger.info(f"Writing de-normalized and CDM mapped data to {output_path}") homogenised_data_xr.to_netcdf( output_path, encoding=encoding, engine="netcdf4", format="NETCDF4"