Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIMX 441 - support v2 transform command #314

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions lambdas/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def generate_transform_commands(
input_data: dict,
run_date: str,
timdex_bucket: str,
run_id: str,
) -> dict[str, list[dict]]:
"""Generate task run command for TIMDEX transform."""
# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
Expand All @@ -82,7 +83,9 @@ def generate_transform_commands(
extract_output_files, input_data, run_date, timdex_bucket
)
case 2:
return _etl_v2_generate_transform_commands_method()
return _etl_v2_generate_transform_commands_method(
extract_output_files, input_data, timdex_bucket, run_id
)


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
Expand Down Expand Up @@ -114,8 +117,23 @@ def _etl_v1_generate_transform_commands_method(


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
def _etl_v2_generate_transform_commands_method() -> dict[str, list[dict]]:
raise NotImplementedError
def _etl_v2_generate_transform_commands_method(
extract_output_files: list[str],
input_data: dict,
timdex_bucket: str,
run_id: str,
) -> dict[str, list[dict]]:
files_to_transform: list[dict] = []
source = input_data["source"]
for extract_output_file in extract_output_files:
transform_command = [
f"--input-file=s3://{timdex_bucket}/{extract_output_file}",
f"--output-location=s3://{timdex_bucket}/dataset",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is subtle but important: this effectively sets /dataset as the TIMDEX parquet dataset.

We may well want to explore setting this via an SSM param / env var at some point, but I'd propose to keep this hardcoded at the moment. We did similar things with source, where each application knew that the source at hand was the folder to look for in S3. Now, it's kind of a simplication where everyone just knows that /dataset is where the dataset is located.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to recap, this will result in the following file tree:

└── timdex-bucket/
    └── dataset/
        └── year=2024/
            └── month=12/
                └── day=01/
                    └── <uuid>-<optional-sequence>.parquet

where UUID uniquely identifies a run per source?

f"--source={source}",
f"--run-id={run_id}",
]
files_to_transform.append({"transform-command": transform_command})
return {"files-to-transform": files_to_transform}


def generate_load_commands(
Expand Down
1 change: 1 addition & 0 deletions lambdas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"TIMDEX_S3_EXTRACT_BUCKET_ID",
"WORKSPACE",
}
# NOTE: FEATURE FLAG: add "run-id" after v1 pathways are removed
REQUIRED_FIELDS = ("next-step", "run-date", "run-type", "source")
REQUIRED_OAI_HARVEST_FIELDS = ("oai-pmh-host", "oai-metadata-format")
VALID_DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%SZ")
Expand Down
4 changes: 3 additions & 1 deletion lambdas/format_input.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import uuid

from lambdas import alma_prep, commands, config, errors, helpers

Expand All @@ -19,6 +20,7 @@ def lambda_handler(event: dict, _context: dict) -> dict:
run_type = event["run-type"]
source = event["source"]
next_step = event["next-step"]
run_id = event.get("run-id", str(uuid.uuid4()))
timdex_bucket = os.environ["TIMDEX_S3_EXTRACT_BUCKET_ID"]

result = {
Expand Down Expand Up @@ -67,7 +69,7 @@ def lambda_handler(event: dict, _context: dict) -> dict:
)
result["next-step"] = "load"
result["transform"] = commands.generate_transform_commands(
extract_output_files, event, run_date, timdex_bucket
extract_output_files, event, run_date, timdex_bucket, run_id
)

elif next_step == "load":
Expand Down
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ def mocked_s3():
def s3_client():
# ruff: noqa: PT022
yield boto3.client("s3")


@pytest.fixture
def run_id():
return "run-abc-123"


# NOTE: FEATURE FLAG: remove after v2 work is complete
@pytest.fixture
def etl_version_2(monkeypatch):
etl_version = 2
monkeypatch.setenv("ETL_VERSION", f"{etl_version}")
return etl_version
28 changes: 16 additions & 12 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_generate_extract_command_geoharvester():
}


def test_generate_transform_commands_required_input_fields():
def test_generate_transform_commands_required_input_fields(etl_version_2, run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
Expand All @@ -88,23 +88,27 @@ def test_generate_transform_commands_required_input_fields():
"testsource/testsource-2022-01-02-full-extracted-records-to-index.xml"
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket"
extract_output_files,
input_data,
"2022-01-02",
"test-timdex-bucket",
run_id,
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-extracted-records-to-index.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-transformed-records-to-index.json",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
}
]
}


def test_generate_transform_commands_all_input_fields():
def test_generate_transform_commands_all_input_fields(etl_version_2, run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
Expand All @@ -117,34 +121,34 @@ def test_generate_transform_commands_all_input_fields():
"testsource/testsource-2022-01-02-daily-extracted-records-to-delete.xml",
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket"
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", run_id
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_01.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_01.json",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_02.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_02.json",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
},
]
Expand Down
79 changes: 79 additions & 0 deletions tests/test_commands_v1.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an effort to make feature flag removal easier later, "v1" tests will be moved here during each phase of work. Once v2 is fully implemented and v1 no longer supported, we can remove this file entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good approach!

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# ruff: noqa: FBT003

from lambdas import commands

# NOTE: FEATURE FLAG: this file can be FULLY removed after v2 work is complete


def test_generate_transform_commands_required_input_fields(run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
"run-type": "full",
"source": "testsource",
}
extract_output_files = [
"testsource/testsource-2022-01-02-full-extracted-records-to-index.xml"
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", run_id
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-extracted-records-to-index.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-transformed-records-to-index.json",
"--source=testsource",
]
}
]
}


def test_generate_transform_commands_all_input_fields(run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
"run-type": "daily",
"source": "testsource",
}
extract_output_files = [
"testsource/testsource-2022-01-02-daily-extracted-records-to-index_01.xml",
"testsource/testsource-2022-01-02-daily-extracted-records-to-index_02.xml",
"testsource/testsource-2022-01-02-daily-extracted-records-to-delete.xml",
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", run_id
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_01.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_01.json",
"--source=testsource",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_02.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_02.json",
"--source=testsource",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
"--source=testsource",
]
},
]
}
Loading