Skip to content

Commit

Permalink
Updates based on discussion in PR # 21
Browse files Browse the repository at this point in the history
* Restructure app to use workflows directory
* Add DSS_INPUT_QUEUE env var
* Add InvalidDSpaceMetadataError exception
* Refactor generate_and_upload_dspace_metadata method to upload_dspace_metadata
* Shift create_dspace_metadata and validate_dspace_metadata from ItemSubmission to BaseWorkflow
* Add init params to BaseWorkflow
* Add stub run method to BaseWorkflow class
* Add dspace_metadata fixture
* Add tests and fixtures for BaseWorkflow @Final methods
  • Loading branch information
ehanson8 committed Dec 17, 2024
1 parent ead9f88 commit 430f3d2
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 188 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Description of the app
SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
AWS_REGION_NAME=### Default AWS region.
DSS_INPUT_QUEUE=### The DSS SQS input queue to which submission messages are sent.
```

### Optional
Expand Down
96 changes: 0 additions & 96 deletions dsc/base.py

This file was deleted.

1 change: 1 addition & 0 deletions dsc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Config:
"WORKSPACE",
"SENTRY_DSN",
"AWS_REGION_NAME",
"DSS_INPUT_QUEUE",
]

OPTIONAL_ENV_VARS: Iterable[str] = ["LOG_LEVEL"]
Expand Down
4 changes: 4 additions & 0 deletions dsc/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
class InvalidDSpaceMetadataError(Exception):
pass


class InvalidSQSMessageError(Exception):
pass
48 changes: 6 additions & 42 deletions dsc/item_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,21 @@
class ItemSubmission:
"""A class to store the required values for a DSpace submission."""

source_metadata: dict[str, Any]
metadata_mapping: dict[str, Any]
s3_client: S3Client
dspace_metadata: dict[str, Any]
bitstream_uris: list[str]
metadata_keyname: str
metadata_uri: str = ""

def generate_and_upload_dspace_metadata(self, bucket: str) -> None:
"""Generate DSpace metadata from the item's source metadata and upload it to S3.
def upload_dspace_metadata(self, bucket: str) -> None:
"""Upload DSpace metadata to S3 using the specified bucket and keyname.
Args:
bucket: The S3 bucket for uploading the item metadata file.
"""
dspace_metadata = self.create_dspace_metadata()
self.s3_client.put_file(
json.dumps(dspace_metadata),
bucket,
self.metadata_keyname,
s3_client = S3Client()
s3_client.put_file(
json.dumps(self.dspace_metadata), bucket, self.metadata_keyname
)
metadata_uri = f"s3://{bucket}/{self.metadata_keyname}"
logger.info(f"Metadata uploaded to S3: {metadata_uri}")
self.metadata_uri = metadata_uri

def create_dspace_metadata(self) -> dict[str, Any]:
"""Create DSpace metadata from the item's source metadata."""
metadata_entries = []
for field_name, field_mapping in self.metadata_mapping.items():
if field_name not in ["item_identifier", "source_system_identifier"]:

field_value = self.source_metadata.get(field_mapping["source_field_name"])
if field_value:
delimiter = field_mapping["delimiter"]
language = field_mapping["language"]
if delimiter:
metadata_entries.extend(
[
{
"key": field_name,
"value": value,
"language": language,
}
for value in field_value.split(delimiter)
]
)
else:
metadata_entries.append(
{
"key": field_name,
"value": field_value,
"language": language,
}
)

return {"metadata": metadata_entries}
Empty file added dsc/workflows/__init__.py
Empty file.
Empty file added dsc/workflows/base/__init__.py
Empty file.
199 changes: 199 additions & 0 deletions dsc/workflows/base/base_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, final

from dsc.exceptions import InvalidDSpaceMetadataError
from dsc.item_submission import ItemSubmission

if TYPE_CHECKING:
from collections.abc import Iterator

logger = logging.getLogger(__name__)


class BaseWorkflow(ABC):
"""A base workflow class from which other workflow classes are derived."""

def __init__(
self,
workflow_name: str,
submission_system: str,
email_recipients: list[str],
metadata_mapping: dict,
s3_bucket: str,
s3_prefix: str | None,
collection_handle: str,
output_queue: str,
) -> None:
"""Initialize base instance.
Args:
workflow_name: The name of the workflow.
submission_system: The system to which item submissions will be sent (e.g.
DSpace@MIT)
email_recipients: The email addresses to notify after runs of the workflow.
metadata_mapping: A mapping file for generating DSpace metadata from the
workflow's source metadata.
s3_bucket: The S3 bucket containing bitstream and metadata files for the
workflow.
s3_prefix: The S3 prefix used for objects in this workflow. This prefix does
NOT include the bucket name.
collection_handle: The handle of the DSpace collection to which submissions
will be uploaded
output_queue: The SQS output queue used for retrieving result messages from
the workflow's submissions.
"""
self.workflow_name: str = workflow_name
self.submission_system: str = submission_system
self.email_recipients: list[str] = email_recipients
self.metadata_mapping: dict = metadata_mapping
self.s3_bucket: str = s3_bucket
self.s3_prefix: str | None = s3_prefix
self.collection_handle: str = collection_handle
self.output_queue: str = output_queue

@final # noqa: B027
def run(self) -> None:
"""Run workflow to submit items to the DSpace Submission Service.
PLANNED CODE:
sqs_client = SQSClient()
for item_submission in self.item_submissions_iter():
item_submission.upload_dspace_metadata(
self.s3_bucket, item_submission.metadata_keyname
)
sqs_client.send_submission_message(
item_submission.item_identifier,
self.workflow_name,
self.output_queue,
self.submission_system,
self.collection_handle,
item_submission.metadata_uri,
item_submission.bitstream_uris,
)
"""

@final
def item_submissions_iter(self) -> Iterator[ItemSubmission]:
"""Generate a batch of item submissions for the DSpace Submission Service.
MUST NOT be overridden by workflow subclasses.
"""
for item_metadata in self.batch_metadata_iter():
item_identifier = self.get_item_identifier(item_metadata)
logger.info(f"Processing submission for '{item_identifier}'")
metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json"
dspace_metadata = self.create_dspace_metadata(item_metadata)
if self.validate_dspace_metadata(dspace_metadata):
item_submission = ItemSubmission(
dspace_metadata=dspace_metadata,
bitstream_uris=self.get_bitstream_uris(item_identifier),
metadata_keyname=metadata_keyname,
)
yield item_submission

@abstractmethod
def batch_metadata_iter(self) -> Iterator[dict[str, Any]]:
"""Iterate through batch metadata to yield item metadata.
MUST be overridden by workflow subclasses.
"""

@abstractmethod
def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401
"""Get identifier for an item submission according to the workflow subclass.
MUST be overridden by workflow subclasses.
Args:
item_metadata: The item metadata from which the item identifier is extracted.
"""

@final
def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]:
"""Create DSpace metadata from the item's source metadata.
A metadata mapping is a dict with the format seen below:
{
"dc.contributor": {
"source_field_name": "contributor",
"language": None,
"delimiter": "|",
}
MUST NOT be overridden by workflow subclasses.
Args:
item_metadata: Item metadata from which the DSpace metadata will be derived.
"""
metadata_entries = []
for field_name, field_mapping in self.metadata_mapping.items():
if field_name not in ["item_identifier", "source_system_identifier"]:

field_value = item_metadata.get(field_mapping["source_field_name"])
if field_value:
delimiter = field_mapping["delimiter"]
language = field_mapping["language"]
if delimiter:
metadata_entries.extend(
[
{
"key": field_name,
"value": value,
"language": language,
}
for value in field_value.split(delimiter)
]
)
else:
metadata_entries.append(
{
"key": field_name,
"value": field_value,
"language": language,
}
)

return {"metadata": metadata_entries}

@final
def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool:
"""Validate that DSpace metadata follows the expected format for DSpace 6.x.
MUST NOT be overridden by workflow subclasses.
Args:
dspace_metadata: DSpace metadata to be validated.
"""
valid = False
if dspace_metadata.get("metadata") is not None:
for element in dspace_metadata["metadata"]:
if element.get("key") is not None and element.get("value") is not None:
valid = True
logger.debug("Valid DSpace metadata created")
else:
raise InvalidDSpaceMetadataError(
f"Invalid DSpace metadata created: {dspace_metadata} ",
)
return valid

@abstractmethod
def get_bitstream_uris(self, item_identifier: str) -> list[str]:
"""Get bitstreams for an item submission according to the workflow subclass.
MUST be overridden by workflow subclasses.
Args:
item_identifier: The identifier used for locating the item's bitstreams.
"""

@abstractmethod
def process_deposit_results(self) -> list[str]:
"""Process results generated by the deposit according to the workflow subclass.
MUST be overridden by workflow subclasses.
"""
Loading

0 comments on commit 430f3d2

Please sign in to comment.