diff --git a/README.md b/README.md index 2a10c84..4532e2d 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Description of the app SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development. WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform. AWS_REGION_NAME=### Default AWS region. +DSS_INPUT_QUEUE=### The DSS SQS input queue to which submission messages are sent. ``` ### Optional diff --git a/dsc/base.py b/dsc/base.py deleted file mode 100644 index 68dcc8d..0000000 --- a/dsc/base.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -import logging -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, final - -from dsc.item_submission import ItemSubmission -from dsc.utilities.aws.s3 import S3Client - -if TYPE_CHECKING: - from collections.abc import Iterator - -logger = logging.getLogger(__name__) - - -class BaseWorkflow(ABC): - """A base workflow class from which other workflow classes are derived.""" - - def __init__( - self, - email_recipients: list[str], - metadata_mapping: dict, - s3_bucket: str, - s3_prefix: str | None, - ) -> None: - """Initialize base instance. - - Args: - email_recipients: The email addresses to notify after runs of the workflow. - metadata_mapping: A mapping file for generating DSpace metadata from the - workflow's source metadata. - s3_bucket: The S3 bucket containing bitstream and metadata files for the - workflow. - s3_prefix: The S3 prefix used for objects in this workflow. This prefix does - NOT include the bucket name. - """ - self.email_recipients: list[str] = email_recipients - self.metadata_mapping: dict = metadata_mapping - self.s3_bucket: str = s3_bucket - self.s3_prefix: str | None = s3_prefix - - @final - def generate_submission_batch(self) -> Iterator[tuple[str, list[str]]]: - """Generate a batch of item submissions for the DSpace Submission Service. - - MUST NOT be overridden by workflow subclasses. - """ - s3_client = S3Client() - batch_metadata = self.get_batch_metadata() - for item_metadata in batch_metadata: - item_identifier = self.get_item_identifier(item_metadata) - logger.info(f"Processing submission for '{item_identifier}'") - metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" - item_submission = ItemSubmission( - source_metadata=item_metadata, - metadata_mapping=self.metadata_mapping, - s3_client=s3_client, - bitstream_uris=self.get_bitstream_uris(item_identifier), - metadata_keyname=metadata_keyname, - ) - item_submission.generate_and_upload_dspace_metadata(self.s3_bucket) - yield item_submission.metadata_uri, item_submission.bitstream_uris - - @abstractmethod - def get_batch_metadata(self) -> list: - """Get source metadata for the batch of items submissions. - - MUST be overridden by workflow subclasses. - """ - - @abstractmethod - def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 - """Get identifier for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_metadata: The item metadata from which the item identifier is extracted. - """ - - @abstractmethod - def get_bitstream_uris(self, item_identifier: str) -> list[str]: - """Get bitstreams for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_identifier: The identifier used for locating the item's bitstreams. - """ - - @abstractmethod - def process_deposit_results(self) -> list[str]: - """Process results generated by the deposit according to the workflow subclass. - - MUST be overridden by workflow subclasses. - """ diff --git a/dsc/config.py b/dsc/config.py index 7e39ee6..eb11bdd 100644 --- a/dsc/config.py +++ b/dsc/config.py @@ -11,6 +11,7 @@ class Config: "WORKSPACE", "SENTRY_DSN", "AWS_REGION_NAME", + "DSS_INPUT_QUEUE", ] OPTIONAL_ENV_VARS: Iterable[str] = ["LOG_LEVEL"] diff --git a/dsc/exceptions.py b/dsc/exceptions.py index 873432a..e6b8855 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -1,2 +1,6 @@ +class InvalidDSpaceMetadataError(Exception): + pass + + class InvalidSQSMessageError(Exception): pass diff --git a/dsc/item_submission.py b/dsc/item_submission.py index 88ec3bb..86cc449 100644 --- a/dsc/item_submission.py +++ b/dsc/item_submission.py @@ -12,57 +12,21 @@ class ItemSubmission: """A class to store the required values for a DSpace submission.""" - source_metadata: dict[str, Any] - metadata_mapping: dict[str, Any] - s3_client: S3Client + dspace_metadata: dict[str, Any] bitstream_uris: list[str] metadata_keyname: str metadata_uri: str = "" - def generate_and_upload_dspace_metadata(self, bucket: str) -> None: - """Generate DSpace metadata from the item's source metadata and upload it to S3. + def upload_dspace_metadata(self, bucket: str) -> None: + """Upload DSpace metadata to S3 using the specified bucket and keyname. Args: bucket: The S3 bucket for uploading the item metadata file. """ - dspace_metadata = self.create_dspace_metadata() - self.s3_client.put_file( - json.dumps(dspace_metadata), - bucket, - self.metadata_keyname, + s3_client = S3Client() + s3_client.put_file( + json.dumps(self.dspace_metadata), bucket, self.metadata_keyname ) metadata_uri = f"s3://{bucket}/{self.metadata_keyname}" logger.info(f"Metadata uploaded to S3: {metadata_uri}") self.metadata_uri = metadata_uri - - def create_dspace_metadata(self) -> dict[str, Any]: - """Create DSpace metadata from the item's source metadata.""" - metadata_entries = [] - for field_name, field_mapping in self.metadata_mapping.items(): - if field_name not in ["item_identifier", "source_system_identifier"]: - - field_value = self.source_metadata.get(field_mapping["source_field_name"]) - if field_value: - delimiter = field_mapping["delimiter"] - language = field_mapping["language"] - if delimiter: - metadata_entries.extend( - [ - { - "key": field_name, - "value": value, - "language": language, - } - for value in field_value.split(delimiter) - ] - ) - else: - metadata_entries.append( - { - "key": field_name, - "value": field_value, - "language": language, - } - ) - - return {"metadata": metadata_entries} diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dsc/workflows/base/base_workflow.py b/dsc/workflows/base/base_workflow.py new file mode 100644 index 0000000..ddc04e1 --- /dev/null +++ b/dsc/workflows/base/base_workflow.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, final + +from dsc.exceptions import InvalidDSpaceMetadataError +from dsc.item_submission import ItemSubmission + +if TYPE_CHECKING: + from collections.abc import Iterator + +logger = logging.getLogger(__name__) + + +class BaseWorkflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + def __init__( + self, + workflow_name: str, + submission_system: str, + email_recipients: list[str], + metadata_mapping: dict, + s3_bucket: str, + s3_prefix: str | None, + collection_handle: str, + output_queue: str, + ) -> None: + """Initialize base instance. + + Args: + workflow_name: The name of the workflow. + submission_system: The system to which item submissions will be sent (e.g. + DSpace@MIT) + email_recipients: The email addresses to notify after runs of the workflow. + metadata_mapping: A mapping file for generating DSpace metadata from the + workflow's source metadata. + s3_bucket: The S3 bucket containing bitstream and metadata files for the + workflow. + s3_prefix: The S3 prefix used for objects in this workflow. This prefix does + NOT include the bucket name. + collection_handle: The handle of the DSpace collection to which submissions + will be uploaded + output_queue: The SQS output queue used for retrieving result messages from + the workflow's submissions. + """ + self.workflow_name: str = workflow_name + self.submission_system: str = submission_system + self.email_recipients: list[str] = email_recipients + self.metadata_mapping: dict = metadata_mapping + self.s3_bucket: str = s3_bucket + self.s3_prefix: str | None = s3_prefix + self.collection_handle: str = collection_handle + self.output_queue: str = output_queue + + @final # noqa: B027 + def run(self) -> None: + """Run workflow to submit items to the DSpace Submission Service. + + PLANNED CODE: + + sqs_client = SQSClient() + for item_submission in self.item_submissions_iter(): + item_submission.upload_dspace_metadata( + self.s3_bucket, item_submission.metadata_keyname + ) + sqs_client.send_submission_message( + item_submission.item_identifier, + self.workflow_name, + self.output_queue, + self.submission_system, + self.collection_handle, + item_submission.metadata_uri, + item_submission.bitstream_uris, + ) + """ + + @final + def item_submissions_iter(self) -> Iterator[ItemSubmission]: + """Generate a batch of item submissions for the DSpace Submission Service. + + MUST NOT be overridden by workflow subclasses. + """ + for item_metadata in self.batch_metadata_iter(): + item_identifier = self.get_item_identifier(item_metadata) + logger.info(f"Processing submission for '{item_identifier}'") + metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" + dspace_metadata = self.create_dspace_metadata(item_metadata) + if self.validate_dspace_metadata(dspace_metadata): + item_submission = ItemSubmission( + dspace_metadata=dspace_metadata, + bitstream_uris=self.get_bitstream_uris(item_identifier), + metadata_keyname=metadata_keyname, + ) + yield item_submission + + @abstractmethod + def batch_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Iterate through batch metadata to yield item metadata. + + MUST be overridden by workflow subclasses. + """ + + @abstractmethod + def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 + """Get identifier for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_metadata: The item metadata from which the item identifier is extracted. + """ + + @final + def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]: + """Create DSpace metadata from the item's source metadata. + + A metadata mapping is a dict with the format seen below: + + { + "dc.contributor": { + "source_field_name": "contributor", + "language": None, + "delimiter": "|", + } + + MUST NOT be overridden by workflow subclasses. + + Args: + item_metadata: Item metadata from which the DSpace metadata will be derived. + """ + metadata_entries = [] + for field_name, field_mapping in self.metadata_mapping.items(): + if field_name not in ["item_identifier", "source_system_identifier"]: + + field_value = item_metadata.get(field_mapping["source_field_name"]) + if field_value: + delimiter = field_mapping["delimiter"] + language = field_mapping["language"] + if delimiter: + metadata_entries.extend( + [ + { + "key": field_name, + "value": value, + "language": language, + } + for value in field_value.split(delimiter) + ] + ) + else: + metadata_entries.append( + { + "key": field_name, + "value": field_value, + "language": language, + } + ) + + return {"metadata": metadata_entries} + + @final + def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool: + """Validate that DSpace metadata follows the expected format for DSpace 6.x. + + MUST NOT be overridden by workflow subclasses. + + Args: + dspace_metadata: DSpace metadata to be validated. + """ + valid = False + if dspace_metadata.get("metadata") is not None: + for element in dspace_metadata["metadata"]: + if element.get("key") is not None and element.get("value") is not None: + valid = True + logger.debug("Valid DSpace metadata created") + else: + raise InvalidDSpaceMetadataError( + f"Invalid DSpace metadata created: {dspace_metadata} ", + ) + return valid + + @abstractmethod + def get_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get bitstreams for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_identifier: The identifier used for locating the item's bitstreams. + """ + + @abstractmethod + def process_deposit_results(self) -> list[str]: + """Process results generated by the deposit according to the workflow subclass. + + MUST be overridden by workflow subclasses. + """ diff --git a/tests/conftest.py b/tests/conftest.py index 5d282f7..e27c176 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,7 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient +from dsc.workflows.base.base_workflow import BaseWorkflow @pytest.fixture(autouse=True) @@ -20,6 +21,40 @@ def _test_env(monkeypatch): monkeypatch.setenv("AWS_REGION_NAME", "us-east-1") monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing") monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing") + monkeypatch.setenv("DSS_INPUT_QUEUE", "mock-input-queue") + + +@pytest.fixture +def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3): + class TestBaseWorkflow(BaseWorkflow): + + def batch_metadata_iter(self): + yield from [item_metadata] + + def get_item_identifier(self, item_metadata): + return item_metadata["item_identifier"] + + def get_bitstream_uris(self, item_identifier): + bitstreams = [ + "s3://dsc/workflow/folder/123_01.pdf", + "s3://dsc/workflow/folder/123_02.pdf", + "s3://dsc/workflow/folder/456_01.pdf", + ] + return [bitstream for bitstream in bitstreams if item_identifier in bitstream] + + def process_deposit_results(self): + pass + + return TestBaseWorkflow( + workflow_name="test", + submission_system="Test@MIT", + email_recipients=["test@test.test"], + metadata_mapping=metadata_mapping, + s3_bucket="dsc", + s3_prefix="workflow/folder", + collection_handle="123.4/5678", + output_queue="mock-output_queue", + ) @pytest.fixture @@ -28,12 +63,41 @@ def config_instance(): @pytest.fixture -def item_submission_instance(metadata_mapping, s3_client): - source_metadata = {"title": "Title", "contributor": "Author 1|Author 2"} +def dspace_metadata(): + return { + "metadata": [ + { + "key": "dc.title", + "language": "en_US", + "value": "Title", + }, + { + "key": "dc.contributor", + "language": None, + "value": "Author 1", + }, + { + "key": "dc.contributor", + "language": None, + "value": "Author 2", + }, + ] + } + + +@pytest.fixture +def item_metadata(): + return { + "title": "Title", + "contributor": "Author 1|Author 2", + "item_identifier": "123", + } + + +@pytest.fixture +def item_submission_instance(dspace_metadata): return ItemSubmission( - source_metadata=source_metadata, - metadata_mapping=metadata_mapping, - s3_client=s3_client, + dspace_metadata=dspace_metadata, bitstream_uris=[ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -80,7 +144,7 @@ def mocked_ses(config_instance): @pytest.fixture -def mocked_sqs_input(sqs_client, config_instance): +def mocked_sqs_input(config_instance): with mock_aws(): sqs = boto3.resource("sqs", region_name=config_instance.AWS_REGION_NAME) sqs.create_queue(QueueName="mock-input-queue") diff --git a/tests/test_base_workflow.py b/tests/test_base_workflow.py new file mode 100644 index 0000000..e9d9415 --- /dev/null +++ b/tests/test_base_workflow.py @@ -0,0 +1,49 @@ +import pytest + +from dsc.exceptions import InvalidDSpaceMetadataError +from dsc.item_submission import ItemSubmission + + +def test_base_workflow_item_submission_iter(base_workflow_instance): + assert next(base_workflow_instance.item_submissions_iter()) == ItemSubmission( + dspace_metadata={ + "metadata": [ + {"key": "dc.title", "value": "Title", "language": "en_US"}, + {"key": "dc.contributor", "value": "Author 1", "language": None}, + {"key": "dc.contributor", "value": "Author 2", "language": None}, + ] + }, + bitstream_uris=[ + "s3://dsc/workflow/folder/123_01.pdf", + "s3://dsc/workflow/folder/123_02.pdf", + ], + metadata_keyname="workflow/folder/123_metadata.json", + metadata_uri="", + ) + + +def test_base_workflow_create_dspace_metadata( + base_workflow_instance, + item_metadata, +): + assert base_workflow_instance.create_dspace_metadata(item_metadata) == { + "metadata": [ + {"key": "dc.title", "language": "en_US", "value": "Title"}, + {"key": "dc.contributor", "language": None, "value": "Author 1"}, + {"key": "dc.contributor", "language": None, "value": "Author 2"}, + ] + } + + +def test_base_workflow_validate_dspace_metadata_success( + base_workflow_instance, + dspace_metadata, +): + assert base_workflow_instance.validate_dspace_metadata(dspace_metadata) + + +def test_base_workflow_validate_dspace_metadata_invalid_raises_exception( + base_workflow_instance, +): + with pytest.raises(InvalidDSpaceMetadataError): + base_workflow_instance.validate_dspace_metadata({}) diff --git a/tests/test_itemsubmission.py b/tests/test_itemsubmission.py index 3cd3d96..7da0cda 100644 --- a/tests/test_itemsubmission.py +++ b/tests/test_itemsubmission.py @@ -1,24 +1,8 @@ from http import HTTPStatus -def test_itemsubmission_init_success(item_submission_instance): - assert item_submission_instance.source_metadata == { - "title": "Title", - "contributor": "Author 1|Author 2", - } - assert item_submission_instance.metadata_mapping == { - "dc.contributor": { - "delimiter": "|", - "language": None, - "source_field_name": "contributor", - }, - "dc.title": {"delimiter": "", "language": "en_US", "source_field_name": "title"}, - "item_identifier": { - "delimiter": "", - "language": None, - "source_field_name": "item_identifier", - }, - } +def test_itemsubmission_init_success(item_submission_instance, dspace_metadata): + assert item_submission_instance.dspace_metadata == dspace_metadata assert item_submission_instance.bitstream_uris == [ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -28,10 +12,8 @@ def test_itemsubmission_init_success(item_submission_instance): ) -def test_generate_and_upload_dspace_metadata( - mocked_s3, item_submission_instance, s3_client -): - item_submission_instance.generate_and_upload_dspace_metadata("dsc") +def test_upload_dspace_metadata(mocked_s3, item_submission_instance, s3_client): + item_submission_instance.upload_dspace_metadata("dsc") assert ( item_submission_instance.metadata_uri == "s3://dsc/workflow/folder/123_metadata.json" @@ -40,25 +22,3 @@ def test_generate_and_upload_dspace_metadata( Bucket="dsc", Key="workflow/folder/123_metadata.json" ) assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK - - -def test_create_dspace_metadata(item_submission_instance): - assert item_submission_instance.create_dspace_metadata() == { - "metadata": [ - { - "key": "dc.title", - "language": "en_US", - "value": "Title", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 1", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 2", - }, - ] - }