Skip to content

Commit

Permalink
Update format Lambda to generate GeoHarvester extract command
Browse files Browse the repository at this point in the history
Why these changes are being introduced:
* The Lambda function needs to determine which harvester to use based on the
source name provided in a payload to the TIMDEX StepFunction.

How this addresses that need:
* Add a conditional to call GeoHarvester when source in ["gismit", "gisogm"]
* Set OAI Harvester as default
* Update helpers.generate_step_output_file to set file_type = "jsonl" when
dealing when source is geospatial layers (i.e., source name contains "gis")
* Add 'harvester-type' property indicating harvester used in extract step
to output payload of format Lambda
* Add test to verify successful creation of GeoHarvester extract command
* Update config.validate_input to only raise error for missing OAI harvest fields

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/GDT-115
  • Loading branch information
jonavellecuerdo committed Jan 8, 2024
1 parent 0485bed commit cf3266f
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 34 deletions.
1 change: 1 addition & 0 deletions lambdas/alma_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def prepare_alma_export_files(run_date: str, run_type: str, timdex_bucket: str)
export_file
)
extract_output_file = helpers.generate_step_output_filename(
"alma",
load_type,
helpers.generate_step_output_prefix("alma", run_date, run_type, "extract"),
"extract",
Expand Down
49 changes: 32 additions & 17 deletions lambdas/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,47 @@ def generate_extract_command(
source, run_date, run_type, step
)
extract_output_file = helpers.generate_step_output_filename(
"index", extract_output_prefix, step
source, "index", extract_output_prefix, step
)

extract_command = [
f"--host={input_data['oai-pmh-host']}",
f"--output-file=s3://{timdex_bucket}/{extract_output_file}",
]
extract_command = []

if verbose:
extract_command.append("--verbose")

extract_command.append("harvest")
extract_command.append(f"--metadata-format={input_data['oai-metadata-format']}")

if source in ["aspace", "dspace"]:
extract_command.append("--method=get")
if source in ["gismit", "gisogm"]:
extract_command.append("harvest")
if run_type == "daily":
extract_command.append("--harvest-type=incremental")
extract_command.append(
f"--from-date={helpers.generate_harvest_from_date(run_date)}"
)
elif run_type == "full":
extract_command.append("--harvest-type=full")

if set_spec := input_data.get("oai-set-spec"):
extract_command.append(f"--set-spec={set_spec}")
extract_command.append(
f"--output-file=s3://{timdex_bucket}/{extract_output_file}"
)
extract_command.append(source.removeprefix("gis"))

if run_type == "daily":
else:
extract_command.append(f"--host={input_data['oai-pmh-host']}")
extract_command.append(
f"--from-date={helpers.generate_harvest_from_date(run_date)}",
f"--output-file=s3://{timdex_bucket}/{extract_output_file}"
)
elif run_type == "full":
extract_command.append("--exclude-deleted")
extract_command.append("harvest")
if source in ["aspace", "dspace"]:
extract_command.append("--method=get")
extract_command.append(f"--metadata-format={input_data['oai-metadata-format']}")
if run_type == "daily":
extract_command.append(
f"--from-date={helpers.generate_harvest_from_date(run_date)}",
)
elif run_type == "full":
extract_command.append("--exclude-deleted")

if set_spec := input_data.get("oai-set-spec"):
extract_command.append(f"--set-spec={set_spec}")

return {
"extract-command": extract_command,
Expand All @@ -71,7 +86,7 @@ def generate_transform_commands(
extract_output_file
)
transform_output_file = helpers.generate_step_output_filename(
load_type, transform_output_prefix, "transform", sequence
source, load_type, transform_output_prefix, "transform", sequence
)

transform_command = [
Expand Down
19 changes: 10 additions & 9 deletions lambdas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"WORKSPACE",
}
REQUIRED_FIELDS = ("next-step", "run-date", "run-type", "source")
REQUIRED_HARVEST_FIELDS = ("oai-pmh-host", "oai-metadata-format")
REQUIRED_OAI_HARVEST_FIELDS = ("oai-pmh-host", "oai-metadata-format")
VALID_DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%SZ")
VALID_RUN_TYPES = ("full", "daily")
VALID_STEPS = ("extract", "transform", "load")
Expand Down Expand Up @@ -83,14 +83,15 @@ def validate_input(input_data: dict) -> None:
# If next step is extract step, required harvest fields are present
# ruff: noqa: SIM102
if input_data["next-step"] == "extract":
if missing_harvest_fields := [
field for field in REQUIRED_HARVEST_FIELDS if field not in input_data
]:
message = (
"Input must include all required harvest fields when starting with "
f"harvest step. Missing fields: {missing_harvest_fields}"
)
raise ValueError(message)
if input_data["source"] not in ["gismit", "gisogm"]:
if missing_harvest_fields := [
field for field in REQUIRED_OAI_HARVEST_FIELDS if field not in input_data
]:
message = (
"Input must include all required harvest fields when starting with "
f"harvest step. Missing fields: {missing_harvest_fields}"
)
raise ValueError(message)


def verify_env() -> None:
Expand Down
6 changes: 4 additions & 2 deletions lambdas/format_input.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
import os

Expand All @@ -12,7 +11,6 @@ def lambda_handler(event: dict, _context: dict) -> dict:
config.verify_env()
verbose = config.check_verbosity(event.get("verbose", False))
config.configure_logger(logging.getLogger(), verbose)
logger.debug(json.dumps(event))
config.validate_input(event)

run_date = helpers.format_run_date(event["run-date"])
Expand All @@ -29,6 +27,10 @@ def lambda_handler(event: dict, _context: dict) -> dict:
}

if next_step == "extract":
if source in ["gismit", "gisogm"]:
result["harvester-type"] = "geo"
else:
result["harvester-type"] = "oai"
result["next-step"] = "transform"
result["extract"] = commands.generate_extract_command(
event, run_date, timdex_bucket, verbose
Expand Down
3 changes: 2 additions & 1 deletion lambdas/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def generate_index_name(source: str) -> str:


def generate_step_output_filename(
source: str,
load_type: str,
prefix: str,
step: str,
Expand All @@ -54,7 +55,7 @@ def generate_step_output_filename(
"""
sequence_suffix = f"_{sequence}" if sequence else ""
if step == "extract":
file_type = "xml"
file_type = "jsonl" if "gis" in source else "xml"
elif load_type == "delete":
file_type = "txt"
else:
Expand Down
27 changes: 24 additions & 3 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,36 @@ def test_generate_extract_command_all_input_fields():
input_data, "2022-01-02", "test-timdex-bucket", True
) == {
"extract-command": [
"--verbose",
"--host=https://example.com/oai",
"--output-file=s3://test-timdex-bucket/aspace/"
"aspace-2022-01-02-full-extracted-records-to-index.xml",
"--verbose",
"harvest",
"--metadata-format=oai_dc",
"--method=get",
"--set-spec=Collection1",
"--metadata-format=oai_dc",
"--exclude-deleted",
"--set-spec=Collection1",
]
}


def test_generate_extract_command_geoharvester():
input_data = {
"run-date": "2022-01-02T12:13:14Z",
"run-type": "daily",
"next-step": "extract",
"source": "gismit",
}
assert commands.generate_extract_command(
input_data, "2022-01-02", "test-timdex-bucket", False
) == {
"extract-command": [
"harvest",
"--harvest-type=incremental",
"--from-date=2022-01-01",
"--output-file=s3://test-timdex-bucket/gismit/"
"gismit-2022-01-02-daily-extracted-records-to-index.jsonl",
"mit",
]
}

Expand Down
1 change: 1 addition & 0 deletions tests/test_format_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def test_lambda_handler_with_next_step_extract():
"run-type": "daily",
"source": "testsource",
"verbose": False,
"harvester-type": "oai",
"next-step": "transform",
"extract": {
"extract-command": [
Expand Down
6 changes: 4 additions & 2 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ def test_generate_index_name():

def test_generate_step_output_filename_with_sequence():
assert (
helpers.generate_step_output_filename("index", "prefix", "extract", "01")
helpers.generate_step_output_filename(
"source", "index", "prefix", "extract", "01"
)
== "prefix-to-index_01.xml"
)


def test_generate_step_output_filename_without_sequence():
assert (
helpers.generate_step_output_filename("delete", "prefix", "transform")
helpers.generate_step_output_filename("source", "delete", "prefix", "transform")
== "prefix-to-delete.txt"
)

Expand Down

0 comments on commit cf3266f

Please sign in to comment.