Skip to content

Commit

Permalink
Add data product name to the logger and update to latest base image i…
Browse files Browse the repository at this point in the history
…n reload/resync lambdas (#2211)

* Add data product name to the logger where missing

* Upgrade to latest base image

This picks up the new path format changes.
  • Loading branch information
MatMoore authored Nov 7, 2023
1 parent 7ce2d6d commit 8008fef
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 66 deletions.
7 changes: 7 additions & 0 deletions containers/daap-reload-data-product/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.0.0]

### Changed

- Add data product name to logging
- Updated base image, to bring in changes to path formats

## [1.0.4]

### Changed
Expand Down
2 changes: 1 addition & 1 deletion containers/daap-reload-data-product/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/ministryofjustice/data-platform-daap-python-base:2.0.0
FROM ghcr.io/ministryofjustice/data-platform-daap-python-base:5.3.1

ARG VERSION

Expand Down
2 changes: 1 addition & 1 deletion containers/daap-reload-data-product/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "daap-reload-data-product",
"version": "1.0.4",
"version": "2.0.0",
"registry": "ecr",
"ecr": {
"role": "arn:aws:iam::013433889002:role/modernisation-platform-oidc-cicd",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def handler(
athena_load_lambda=os.environ.get("ATHENA_LOAD_LAMBDA", ""),
):
data_product_name = event.get("data_product", "")
logger.add_extras({"data_product_name": data_product_name})

data_product = DataProductConfig(name=data_product_name)
raw_prefix = data_product.raw_data_prefix.key
raw_data_bucket = data_product.raw_data_bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
from reload_data_product import get_data_product_pages, handler, s3_recursive_delete


@pytest.fixture
def empty_metadata_bucket(s3_client, monkeypatch):
bucket_name = "metadata"
s3_client.create_bucket(Bucket=bucket_name)
monkeypatch.setenv("METADATA_BUCKET", bucket_name)
return bucket_name


@pytest.fixture
def empty_curated_data_bucket(s3_client, monkeypatch):
bucket_name = "curated"
Expand Down Expand Up @@ -37,7 +45,9 @@ def empty_raw_data_bucket(s3_client, monkeypatch):


@pytest.fixture
def data_product(empty_raw_data_bucket, empty_curated_data_bucket):
def data_product(
empty_raw_data_bucket, empty_curated_data_bucket, empty_metadata_bucket
):
return DataProductConfig(
"foo", raw_data_bucket=raw_data_bucket, curated_data_bucket=curated_data_bucket
)
Expand Down Expand Up @@ -89,8 +99,8 @@ def test_get_data_product_pages(s3_client, raw_data_bucket, data_product):
)
assert len(pages) == 1
assert {i["Key"] for i in pages[0]["Contents"]} == {
"raw_data/foo/bar/abc",
"raw_data/foo/bar/baz",
"raw/foo/v1.0/bar/abc",
"raw/foo/v1.0/bar/baz",
}


Expand Down Expand Up @@ -153,13 +163,13 @@ def test_handler_invokes_lambda_for_each_raw_file(
do_nothing_lambda_client.invoke.assert_any_call(
FunctionName="athena_load_lambda",
InvocationType="Event",
Payload=f'{{"detail":{{"bucket":{{"name":"{raw_data_bucket}"}}, "object":{{"key":"raw_data/foo/bar/abc"}}}}}}',
Payload=f'{{"detail":{{"bucket":{{"name":"{raw_data_bucket}"}}, "object":{{"key":"raw/foo/v1.0/bar/abc"}}}}}}',
)

do_nothing_lambda_client.invoke.assert_any_call(
FunctionName="athena_load_lambda",
InvocationType="Event",
Payload=f'{{"detail":{{"bucket":{{"name":"{raw_data_bucket}"}}, "object":{{"key":"raw_data/foo/bar/baz"}}}}}}',
Payload=f'{{"detail":{{"bucket":{{"name":"{raw_data_bucket}"}}, "object":{{"key":"raw/foo/v1.0/bar/baz"}}}}}}',
)


Expand Down
7 changes: 7 additions & 0 deletions containers/daap-resync-unprocessed-files/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.0.0]

### Changed

- Add data product name to logging
- Updated base image, to bring in changes to path formats

## [1.0.3]

### Added
Expand Down
2 changes: 1 addition & 1 deletion containers/daap-resync-unprocessed-files/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/ministryofjustice/data-platform-daap-python-base:2.0.1
FROM ghcr.io/ministryofjustice/data-platform-daap-python-base:5.3.1

ARG VERSION

Expand Down
2 changes: 1 addition & 1 deletion containers/daap-resync-unprocessed-files/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "daap-resync-unprocessed-files",
"version": "1.0.3",
"version": "2.0.0",
"registry": "ecr",
"ecr": {
"role": "arn:aws:iam::013433889002:role/modernisation-platform-oidc-cicd",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
from data_platform_logging import DataPlatformLogger
from data_platform_paths import (
DataProductConfig,
extract_database_name_from_curated_path,
extract_table_name_from_curated_path,
extract_timestamp_from_curated_path,
get_curated_data_bucket,
get_raw_data_bucket,
)
Expand All @@ -28,15 +25,16 @@

def handler(event, context):
data_product_to_recreate = event.get("data_product", "")
logger.add_extras({"data_product_name": data_product_to_recreate})

data_product = DataProductConfig(
name=data_product_to_recreate,
raw_data_bucket=raw_data_bucket,
curated_data_bucket=curated_data_bucket,
)

raw_prefix = data_product.raw_data_prefix
curated_prefix = data_product.curated_data_prefix
raw_prefix = data_product.raw_data_prefix.key
curated_prefix = data_product.curated_data_prefix.key

logger.info(f"Raw prefix: {raw_prefix}")
logger.info(f"Curated prefix: {curated_prefix}")
Expand All @@ -52,8 +50,8 @@ def handler(event, context):
bucket=curated_data_bucket, data_product_prefix=curated_prefix
)

raw_table_timestamps = get_unique_extraction_timestamps(raw_pages)
curated_table_timestamps = get_curated_unique_extraction_timestamps(curated_pages)
raw_table_timestamps = get_unique_load_timestamps(raw_pages)
curated_table_timestamps = get_unique_load_timestamps(curated_pages)

# compare and filter the raw files to sync
raw_keys_to_resync = get_resync_keys(
Expand Down Expand Up @@ -95,45 +93,23 @@ def get_data_product_pages(bucket, data_product_prefix, s3_client=s3) -> PageIte
return pages


def get_unique_extraction_timestamps(pages: PageIterator) -> set:
def get_unique_load_timestamps(pages: PageIterator) -> set:
"""
return the unique slugs of data product, table and extraction timestamp
return the unique slugs of data product, table and load timestamp
designed for use with boto3's pageiterator and list_object_v2
example key: `key = "raw_data/data_product/table/extraction_timestamp=timestamp/file.csv`
example key: `key = "raw/data_product/v1.0/table/load_timestamp=timestamp/file.csv`
size > 0 because sometimes empty directories get listed in contents
"""
filtered_pages = pages.search("Contents[?Size > `0`][]")
result_set = set("/".join(item["Key"].split("/")[1:-1]) for item in filtered_pages)
return result_set


def get_curated_unique_extraction_timestamps(curated_pages: PageIterator) -> set:
"""
return the unique slugs of data product, table and extraction timestamp
designed for use with boto3's pageiterator and list_object_v2
example key: `key = "curated_data/database_name=data_product/table_name=table"
+ "/extraction_timestamp=timestamp/file.parquet`
size > 0 because sometimes empty directories get listed in contents
"""
curated_table_timestamps = set()
for item in curated_pages.search("Contents[?Size > `0`][]"):
data_product = extract_database_name_from_curated_path(item["Key"])
table = extract_table_name_from_curated_path(item["Key"])
extraction_timestamp = extract_timestamp_from_curated_path(item["Key"])

if data_product and table and extraction_timestamp:
# Both sets need the same formatting to compare them
curated_table_timestamps.add(
f"{data_product}/{table}/extraction_timestamp={extraction_timestamp}"
)
return curated_table_timestamps


def get_resync_keys(
raw_table_timestamps: set, curated_table_timestamps: set, raw_pages: PageIterator
) -> list:
"""
Find extraction timestamps in the raw area, and not in the curated area
Find load timestamps in the raw area, and not in the curated area
"""
timestamps_to_resync = [
item for item in raw_table_timestamps if item not in curated_table_timestamps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class FakeContext:
function_name: str


@pytest.fixture(autouse=True)
def empty_metadata_bucket(s3_client):
bucket_name = os.environ["BUCKET_NAME"]
s3_client.create_bucket(Bucket=bucket_name)
return bucket_name


@pytest.fixture
def fake_context():
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pytest
from resync_unprocessed_files import (
get_curated_unique_extraction_timestamps,
get_data_product_pages,
get_resync_keys,
get_unique_extraction_timestamps,
get_unique_load_timestamps,
)


Expand All @@ -26,20 +25,17 @@ def raw_data_bucket(s3_client, empty_raw_data_bucket, data_element):
bucket_name = empty_raw_data_bucket
s3_client.put_object(
Bucket=bucket_name,
Key=data_element.raw_data_prefix.key
+ "extraction_timestamp=timestamp1/file1.csv",
Key=data_element.raw_data_prefix.key + "load_timestamp=timestamp1/file1.csv",
Body="Test data in file 1",
)
s3_client.put_object(
Bucket=bucket_name,
Key=data_element.raw_data_prefix.key
+ "extraction_timestamp=timestamp2/file2.csv",
Key=data_element.raw_data_prefix.key + "load_timestamp=timestamp2/file2.csv",
Body="Test data in file 2",
)
s3_client.put_object(
Bucket=bucket_name,
Key=data_element.raw_data_prefix.key
+ "extraction_timestamp=timestamp1/file3.csv",
Key=data_element.raw_data_prefix.key + "load_timestamp=timestamp1/file3.csv",
Body="Test data in same extraction time stamp but different file",
)
return bucket_name
Expand All @@ -50,15 +46,10 @@ def curated_data_bucket(s3_client, empty_curated_data_bucket, data_element):
bucket_name = empty_curated_data_bucket
s3_client.put_object(
Bucket=bucket_name,
Key=f"{data_element.curated_data_prefix.key}table_name/extraction_timestamp=timestamp1"
Key=f"{data_element.curated_data_prefix.key}load_timestamp=timestamp1"
+ "/file1.parquet",
Body="This is test File",
)
s3_client.put_object(
Bucket=bucket_name,
Key=f"{data_element.curated_data_prefix.key}abc",
Body="Another file",
)
s3_client.put_object(Bucket=bucket_name, Key="some-other", Body="One more file")
return bucket_name

Expand All @@ -71,10 +62,10 @@ def test_get_raw_data_unique_extraction_timestamps(
data_product_prefix=data_element.raw_data_prefix.key,
s3_client=s3_client,
)
raw_table_timestamp = sorted(get_unique_extraction_timestamps(pages))
raw_table_timestamp = sorted(get_unique_load_timestamps(pages))
assert {i for i in raw_table_timestamp} == {
"data_product/table_name/extraction_timestamp=timestamp1",
"data_product/table_name/extraction_timestamp=timestamp2",
"data_product/v1.0/table_name/load_timestamp=timestamp1",
"data_product/v1.0/table_name/load_timestamp=timestamp2",
}


Expand All @@ -87,9 +78,9 @@ def test_get_curated_unique_extraction_timestamps(
s3_client=s3_client,
)

curated_table_timestamp = get_curated_unique_extraction_timestamps(pages)
curated_table_timestamp = sorted(get_unique_load_timestamps(pages))
assert {i for i in curated_table_timestamp} == {
"data_product/table_name/extraction_timestamp=timestamp1"
"data_product/v1.0/table_name/load_timestamp=timestamp1"
}


Expand All @@ -99,21 +90,20 @@ def test_get_resync_keys(s3_client, data_element, raw_data_bucket, curated_data_
data_product_prefix=data_element.raw_data_prefix.key,
s3_client=s3_client,
)
raw_table_timestamps = sorted(get_unique_extraction_timestamps(raw_pages))
raw_table_timestamps = sorted(get_unique_load_timestamps(raw_pages))

curated_pages = get_data_product_pages(
bucket=curated_data_bucket,
data_product_prefix=data_element.curated_data_prefix.key,
s3_client=s3_client,
)

curated_table_timestamps = get_curated_unique_extraction_timestamps(curated_pages)
curated_table_timestamps = sorted(get_unique_load_timestamps(curated_pages))

raw_keys_to_resync = get_resync_keys(
raw_table_timestamps, curated_table_timestamps, raw_pages
)
print(raw_keys_to_resync)

assert {i for i in raw_keys_to_resync} == {
"raw_data/data_product/table_name/"
+ "extraction_timestamp=timestamp2/file2.csv"
"raw/data_product/v1.0/table_name/" + "load_timestamp=timestamp2/file2.csv"
}

0 comments on commit 8008fef

Please sign in to comment.