Skip to content

Commit

Permalink
Create BaseWorkflow and ItemSubmission classes
Browse files Browse the repository at this point in the history
Why these changes are being introduced:
* The BaseWorkflow and ItemSubmission classes are the foundation from which a significant portion of the application's functionality will be derived.

How this addresses that need:
* Add BaseWorflow class with abstract methods outlining the expected functionality.
* Add ItemSubmission class with largely complete functionality and corresponding unit tests and fixtures

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IN-1101
  • Loading branch information
ehanson8 committed Dec 13, 2024
1 parent 21414da commit 769d6da
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 1 deletion.
96 changes: 96 additions & 0 deletions dsc/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, final

from dsc.item_submission import ItemSubmission
from dsc.utilities.aws.s3 import S3Client

if TYPE_CHECKING:
from collections.abc import Iterator

logger = logging.getLogger(__name__)


class BaseWorkflow(ABC):
"""A base workflow class from which other workflow classes are derived."""

def __init__(
self,
email_recipients: list[str],
metadata_mapping: dict,
s3_bucket: str,
s3_prefix: str | None,
) -> None:
"""Initialize base instance.
Args:
email_recipients: The email addresses to notify after runs of the workflow.
metadata_mapping: A mapping file for generating DSpace metadata from the
workflow's source metadata.
s3_bucket: The S3 bucket containing bitstream and metadata files for the
workflow.
s3_prefix: The S3 prefix used for objects in this workflow. This prefix does
NOT include the bucket name.
"""
self.email_recipients: list[str] = email_recipients
self.metadata_mapping: dict = metadata_mapping
self.s3_bucket: str = s3_bucket
self.s3_prefix: str | None = s3_prefix

@final
def generate_submission_batch(self) -> Iterator[tuple[str, list[str]]]:
"""Generate a batch of item submissions for the DSpace Submission Service.
MUST NOT be overridden by workflow subclasses.
"""
s3_client = S3Client()
batch_metadata = self.get_batch_metadata()
for item_metadata in batch_metadata:
item_identifier = self.get_item_identifier(item_metadata)
logger.info(f"Processing submission for '{item_identifier}'")
metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json"
item_submission = ItemSubmission(
source_metadata=item_metadata,
metadata_mapping=self.metadata_mapping,
s3_client=s3_client,
bitstream_uris=self.get_bitstream_uris(item_identifier),
metadata_keyname=metadata_keyname,
)
item_submission.generate_and_upload_dspace_metadata(self.s3_bucket)
yield item_submission.metadata_uri, item_submission.bitstream_uris

@abstractmethod
def get_batch_metadata(self) -> list:
"""Get source metadata for the batch of items submissions.
MUST be overridden by workflow subclasses.
"""

@abstractmethod
def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401
"""Get identifier for an item submission according to the workflow subclass.
MUST be overridden by workflow subclasses.
Args:
item_metadata: The item metadata from which the item identifier is extracted.
"""

@abstractmethod
def get_bitstream_uris(self, item_identifier: str) -> list[str]:
"""Get bitstreams for an item submission according to the workflow subclass.
MUST be overridden by workflow subclasses.
Args:
item_identifier: The identifier used for locating the item's bitstreams.
"""

@abstractmethod
def process_deposit_results(self) -> list[str]:
"""Process results generated by the deposit according to the workflow subclass.
MUST be overridden by workflow subclasses.
"""
68 changes: 68 additions & 0 deletions dsc/item_submission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import logging
from dataclasses import dataclass
from typing import Any

from dsc.utilities.aws.s3 import S3Client

logger = logging.getLogger(__name__)


@dataclass
class ItemSubmission:
"""A class to store the required values for a DSpace submission."""

source_metadata: dict[str, Any]
metadata_mapping: dict[str, Any]
s3_client: S3Client
bitstream_uris: list[str]
metadata_keyname: str
metadata_uri: str = ""

def generate_and_upload_dspace_metadata(self, bucket: str) -> None:
"""Generate DSpace metadata from the item's source metadata and upload it to S3.
Args:
bucket: The S3 bucket for uploading the item metadata file.
"""
dspace_metadata = self.create_dspace_metadata()
self.s3_client.put_file(
json.dumps(dspace_metadata),
bucket,
self.metadata_keyname,
)
metadata_uri = f"s3://{bucket}/{self.metadata_keyname}"
logger.info(f"Metadata uploaded to S3: {metadata_uri}")
self.metadata_uri = metadata_uri

def create_dspace_metadata(self) -> dict[str, Any]:
"""Create DSpace metadata from the item's source metadata."""
metadata_entries = []
for field_name, field_mapping in self.metadata_mapping.items():
if field_name not in ["item_identifier", "source_system_identifier"]:

field_value = self.source_metadata.get(field_mapping["source_field_name"])
if field_value:
delimiter = field_mapping["delimiter"]
language = field_mapping["language"]
if delimiter:
metadata_entries.extend(
[
{
"key": field_name,
"value": value,
"language": language,
}
for value in field_value.split(delimiter)
]
)
else:
metadata_entries.append(
{
"key": field_name,
"value": field_value,
"language": language,
}
)

return {"metadata": metadata_entries}
39 changes: 38 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from moto import mock_aws

from dsc.config import Config
from dsc.item_submission import ItemSubmission
from dsc.utilities.aws.s3 import S3Client
from dsc.utilities.aws.ses import SESClient
from dsc.utilities.aws.sqs import SQSClient
Expand All @@ -22,10 +23,46 @@ def _test_env(monkeypatch):


@pytest.fixture
def config_instance() -> Config:
def config_instance():
return Config()


@pytest.fixture
def item_submission_instance(metadata_mapping, s3_client):
source_metadata = {"title": "Title", "contributor": "Author 1|Author 2"}
return ItemSubmission(
source_metadata=source_metadata,
metadata_mapping=metadata_mapping,
s3_client=s3_client,
bitstream_uris=[
"s3://dsc/workflow/folder/123_01.pdf",
"s3://dsc/workflow/folder/123_02.pdf",
],
metadata_keyname="workflow/folder/123_metadata.json",
)


@pytest.fixture
def metadata_mapping():
return {
"item_identifier": {
"source_field_name": "item_identifier",
"language": None,
"delimiter": "",
},
"dc.title": {
"source_field_name": "title",
"language": "en_US",
"delimiter": "",
},
"dc.contributor": {
"source_field_name": "contributor",
"language": None,
"delimiter": "|",
},
}


@pytest.fixture
def mocked_s3(config_instance):
with mock_aws():
Expand Down
64 changes: 64 additions & 0 deletions tests/test_itemsubmission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from http import HTTPStatus


def test_itemsubmission_init_success(item_submission_instance):
assert item_submission_instance.source_metadata == {
"title": "Title",
"contributor": "Author 1|Author 2",
}
assert item_submission_instance.metadata_mapping == {
"dc.contributor": {
"delimiter": "|",
"language": None,
"source_field_name": "contributor",
},
"dc.title": {"delimiter": "", "language": "en_US", "source_field_name": "title"},
"item_identifier": {
"delimiter": "",
"language": None,
"source_field_name": "item_identifier",
},
}
assert item_submission_instance.bitstream_uris == [
"s3://dsc/workflow/folder/123_01.pdf",
"s3://dsc/workflow/folder/123_02.pdf",
]
assert (
item_submission_instance.metadata_keyname == "workflow/folder/123_metadata.json"
)


def test_generate_and_upload_dspace_metadata(
mocked_s3, item_submission_instance, s3_client
):
item_submission_instance.generate_and_upload_dspace_metadata("dsc")
assert (
item_submission_instance.metadata_uri
== "s3://dsc/workflow/folder/123_metadata.json"
)
response = s3_client.client.get_object(
Bucket="dsc", Key="workflow/folder/123_metadata.json"
)
assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK


def test_create_dspace_metadata(item_submission_instance):
assert item_submission_instance.create_dspace_metadata() == {
"metadata": [
{
"key": "dc.title",
"language": "en_US",
"value": "Title",
},
{
"key": "dc.contributor",
"language": None,
"value": "Author 1",
},
{
"key": "dc.contributor",
"language": None,
"value": "Author 2",
},
]
}

0 comments on commit 769d6da

Please sign in to comment.