Skip to content
This repository has been archived by the owner on Jan 10, 2025. It is now read-only.

Commit

Permalink
inspect missing dates
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Feb 27, 2024
1 parent 7d7b315 commit 93cc268
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 61 deletions.
41 changes: 15 additions & 26 deletions ecml_tools/commands/inspect/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def compute_directory_size(path):
return None, None
size = 0
n = 0
for dirpath, _, filenames in tqdm.tqdm(
os.walk(path), desc="Computing size", leave=False
):
for dirpath, _, filenames in tqdm.tqdm(os.walk(path), desc="Computing size", leave=False):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
size += os.path.getsize(file_path)
Expand Down Expand Up @@ -154,6 +152,12 @@ def shape(self):
if self.data and hasattr(self.data, "shape"):
return self.data.shape

@property
def n_missing_dates(self):
if "missing_dates" in self.metadata:
return len(self.metadata["missing_dates"])
return None

@property
def uncompressed_data_size(self):
if self.data and hasattr(self.data, "dtype") and hasattr(self.data, "size"):
Expand All @@ -164,6 +168,8 @@ def info(self, detailed, size):
print(f'📅 Start : {self.first_date.strftime("%Y-%m-%d %H:%M")}')
print(f'📅 End : {self.last_date.strftime("%Y-%m-%d %H:%M")}')
print(f"⏰ Frequency : {self.frequency}h")
if self.n_missing_dates is not None:
print(f"🚫 Missing : {self.n_missing_dates}")
print(f"🌎 Resolution: {self.resolution}")

print()
Expand Down Expand Up @@ -303,21 +309,15 @@ def progress(self):
assert build_flags.size == build_lengths.size

latest_write_timestamp = self.zarr.attrs.get("latest_write_timestamp")
latest = (
datetime.datetime.fromisoformat(latest_write_timestamp)
if latest_write_timestamp
else None
)
latest = datetime.datetime.fromisoformat(latest_write_timestamp) if latest_write_timestamp else None

if not all(build_flags):
if latest:
print(f"🪫 Dataset not ready, last update {when(latest)}.")
else:
print("🪫 Dataset not ready.")
total = sum(build_lengths)
built = sum(
ln if flag else 0 for ln, flag in zip(build_lengths, build_flags)
)
built = sum(ln if flag else 0 for ln, flag in zip(build_lengths, build_flags))
print(
"📈 Progress:",
progress(built, total, width=50),
Expand Down Expand Up @@ -403,9 +403,7 @@ def last_date(self):
assert isinstance(time, int), (time, type(time))
if time > 100:
time = time // 100
return datetime.datetime.fromisoformat(monthly["stop"]) + datetime.timedelta(
hours=time
)
return datetime.datetime.fromisoformat(monthly["stop"]) + datetime.timedelta(hours=time)

@property
def frequency(self):
Expand Down Expand Up @@ -464,12 +462,8 @@ def _info(self, verbose, history, statistics, **kwargs):

# for backward compatibility
if "climetlab" in z.attrs:
climetlab_version = (
z.attrs["climetlab"].get("versions", {}).get("climetlab", "unkwown")
)
print(
f"climetlab version used to create this zarr: {climetlab_version}. Not supported."
)
climetlab_version = z.attrs["climetlab"].get("versions", {}).get("climetlab", "unkwown")
print(f"climetlab version used to create this zarr: {climetlab_version}. Not supported.")
return

version = z.attrs.get("version")
Expand All @@ -495,12 +489,7 @@ def initialised(self):
return datetime.datetime.fromisoformat(record["timestamp"])

# Sometimes the first record is missing
timestamps = sorted(
[
datetime.datetime.fromisoformat(d["timestamp"])
for d in self.metadata.get("history", [])
]
)
timestamps = sorted([datetime.datetime.fromisoformat(d["timestamp"]) for d in self.metadata.get("history", [])])
if timestamps:
return timestamps[0]

Expand Down
49 changes: 14 additions & 35 deletions ecml_tools/create/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,16 @@ def initialise(self, check_name=True):
f"Dates: Found {len(dates)} datetimes, in {len(self.groups)} groups: ",
end="",
)
print(f"Missing dates: {len(dates.missing)}")
lengths = [len(g) for g in self.groups]
self.print(
f"Found {len(dates)} datetimes {'+'.join([str(_) for _ in lengths])}."
)
self.print(f"Found {len(dates)} datetimes {'+'.join([str(_) for _ in lengths])}.")
print("-------------------------")

variables = self.minimal_input.variables
self.print(f"Found {len(variables)} variables : {','.join(variables)}.")

ensembles = self.minimal_input.ensembles
self.print(
f"Found {len(ensembles)} ensembles : {','.join([str(_) for _ in ensembles])}."
)
self.print(f"Found {len(ensembles)} ensembles : {','.join([str(_) for _ in ensembles])}.")

grid_points = self.minimal_input.grid_points
print(f"gridpoints size: {[len(i) for i in grid_points]}")
Expand All @@ -212,9 +209,7 @@ def initialise(self, check_name=True):
print(f"{chunks=}")
dtype = self.output.dtype

self.print(
f"Creating Dataset '{self.path}', with {total_shape=}, {chunks=} and {dtype=}"
)
self.print(f"Creating Dataset '{self.path}', with {total_shape=}, {chunks=} and {dtype=}")

metadata = {}
metadata["uuid"] = str(uuid.uuid4())
Expand Down Expand Up @@ -242,6 +237,7 @@ def initialise(self, check_name=True):
metadata["frequency"] = frequency
metadata["start_date"] = dates[0].isoformat()
metadata["end_date"] = dates[-1].isoformat()
metadata["missing_dates"] = [_.isoformat() for _ in dates.missing]

if check_name:
basename, ext = os.path.splitext(os.path.basename(self.path))
Expand Down Expand Up @@ -284,9 +280,7 @@ def initialise(self, check_name=True):

self.registry.create(lengths=lengths)
self.statistics_registry.create(exist_ok=False)
self.registry.add_to_history(
"statistics_registry_initialised", version=self.statistics_registry.version
)
self.registry.add_to_history("statistics_registry_initialised", version=self.statistics_registry.version)

self.registry.add_to_history("init finished")

Expand All @@ -307,17 +301,13 @@ def load(self, parts):
self.registry.add_to_history("loading_data_start", parts=parts)

z = zarr.open(self.path, mode="r+")
data_writer = DataWriter(
parts, parent=self, full_array=z["data"], print=self.print
)
data_writer = DataWriter(parts, parent=self, full_array=z["data"], print=self.print)

total = len(self.registry.get_flags())
filter = CubesFilter(parts=parts, total=total)
for igroup, group in enumerate(self.groups):
if self.registry.get_flag(igroup):
LOG.info(
f" -> Skipping {igroup} total={len(self.groups)} (already done)"
)
LOG.info(f" -> Skipping {igroup} total={len(self.groups)} (already done)")
continue
if not filter(igroup):
continue
Expand All @@ -330,9 +320,7 @@ def load(self, parts):

self.registry.add_to_history("loading_data_end", parts=parts)
self.registry.add_provenance(name="provenance_load")
self.statistics_registry.add_provenance(
name="provenance_load", config=self.main_config
)
self.statistics_registry.add_provenance(name="provenance_load", config=self.main_config)

self.print_info()

Expand Down Expand Up @@ -395,13 +383,9 @@ def check_complete(self, force):
if self._complete:
return
if not force:
raise Exception(
f"❗Zarr {self.path} is not fully built. Use 'force' option."
)
raise Exception(f"❗Zarr {self.path} is not fully built. Use 'force' option.")
if self._write_to_dataset:
print(
f"❗Zarr {self.path} is not fully built, not writting statistics into dataset."
)
print(f"❗Zarr {self.path} is not fully built, not writting statistics into dataset.")
self._write_to_dataset = False

@property
Expand All @@ -424,9 +408,7 @@ def _complete(self):

def read_dataset_dates_metadata(self):
ds = open_dataset(self.path)
subset = ds.dates_interval_to_indices(
self.statistics_start, self.statistics_end
)
subset = ds.dates_interval_to_indices(self.statistics_start, self.statistics_end)
self.i_start = subset[0]
self.i_end = subset[-1]
self.date_start = ds.dates[subset[0]]
Expand Down Expand Up @@ -460,8 +442,7 @@ def recompute_temporary_statistics(self):
self.statistics_registry.create(exist_ok=True)

self.print(
f"Building temporary statistics from data {self.path}. "
f"From {self.date_start} to {self.date_end}"
f"Building temporary statistics from data {self.path}. " f"From {self.date_start} to {self.date_end}"
)

shape = (self.i_end + 1 - self.i_start, len(self.variables_names))
Expand All @@ -487,9 +468,7 @@ def recompute_temporary_statistics(self):

print(f"✅ Saving statistics for {key} shape={detailed_stats['count'].shape}")
self.statistics_registry[key] = detailed_stats
self.statistics_registry.add_provenance(
name="provenance_recompute_statistics", config=self.main_config
)
self.statistics_registry.add_provenance(name="provenance_recompute_statistics", config=self.main_config)

def get_detailed_stats(self):
expected_shape = (self.dataset_shape[0], self.dataset_shape[1])
Expand Down

0 comments on commit 93cc268

Please sign in to comment.