Skip to content

Commit

Permalink
Updates based on discussion in #56
Browse files Browse the repository at this point in the history
* Update formatting of click options
* Shift reconcile functionality to BaseWorkflow.reconcile_bitstreams_and_metadata method and add corresponding unit tests
* Add unit tests for BaseWorkflow's load and get_workflow methods
* Refactor get_workflow method to use subclasses
* Remove WORKFLOWS constant from config.py
* Add InvalidWorkflowNameError exception
* Replace setdefault call with defaultdict in build_bitstream_dict function
* Remove file type filtering from build_bitstream_dict function and remove related unit and CLI tests
  • Loading branch information
ehanson8 committed Jan 7, 2025
1 parent e7822ef commit 0c860b0
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 133 deletions.
60 changes: 13 additions & 47 deletions dsc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
import click

from dsc.config import Config
from dsc.utilities import (
build_bitstream_dict,
match_bitstreams_to_item_identifiers,
match_item_identifiers_to_bitstreams,
)
from dsc.workflows.base import BaseWorkflow

logger = logging.getLogger(__name__)
Expand All @@ -21,20 +16,20 @@
@click.pass_context
@click.option(
"-w",
"--workflow_name",
"--workflow-name",
help="The workflow to use for the batch of DSpace submissions",
required=True,
)
@click.option(
"-b",
"--batch_id",
help="The S3 prefix for the batch of DSpace submissions",
"-c",
"--collection-handle",
help="The handle of the DSpace collection to which the batch will be submitted",
required=True,
)
@click.option(
"-c",
"--collection_handle",
help="The handle of the DSpace collection to which the batch will be submitted",
"-b",
"--batch-id",
help="The S3 prefix for the batch of DSpace submissions",
required=True,
)
@click.option(
Expand Down Expand Up @@ -80,43 +75,14 @@ def post_main_group_subcommand(

@main.command()
@click.pass_context
@click.option(
"-f",
"--file_type",
help="Optional parameter to filter bitstreams to specified file type",
)
def reconcile(ctx: click.Context, file_type: str) -> None:
"""Match files in the S3 directory with entries in the batch metadata."""
def reconcile(ctx: click.Context) -> None:
"""Reconcile bitstreams with item identifiers from the metadata."""
workflow = ctx.obj["workflow"]
no_bitstreams, no_item_identifiers = workflow.reconcile_bitstreams_and_metadata()

bitstream_dict = build_bitstream_dict(
workflow.s3_bucket, file_type, workflow.batch_path
)

# extract item identifiers from batch metadata
item_identifiers = [
workflow.get_item_identifier(item_metadata)
for item_metadata in workflow.item_metadata_iter()
]

# reconcile item identifiers against S3 files
item_identifier_matches = match_item_identifiers_to_bitstreams(
bitstream_dict.keys(), item_identifiers
)
file_matches = match_bitstreams_to_item_identifiers(
bitstream_dict.keys(), item_identifiers
)
no_bitstreams = set(item_identifiers) - set(item_identifier_matches)
no_identifiers = set(bitstream_dict.keys()) - set(file_matches)

logger.info(
f"Item identifiers and bitstreams successfully matched: {item_identifier_matches}"
)
if no_bitstreams:
logger.error(f"No bitstreams found for these item identifiers: {no_bitstreams}")
if no_item_identifiers:
logger.error(
f"No bitstreams found for the following item identifiers: {no_bitstreams}"
)
if no_identifiers:
logger.error(
f"No item identifiers found for the following bitstreams: {no_identifiers}"
f"No item identifiers found for these bitstreams: {no_item_identifiers}"
)
6 changes: 0 additions & 6 deletions dsc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

import sentry_sdk

WORKFLOWS = {
"test": {
"workflow-path": "tests.conftest.TestBaseWorkflow",
}
}


class Config:
REQUIRED_ENV_VARS: Iterable[str] = [
Expand Down
4 changes: 4 additions & 0 deletions dsc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ class InvalidSQSMessageError(Exception):
pass


class InvalidWorkflowNameError(Exception):
pass


class ItemMetadatMissingRequiredFieldError(Exception):
pass
16 changes: 5 additions & 11 deletions dsc/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,26 @@
from _collections_abc import dict_keys
from collections import defaultdict

from dsc.utilities.aws.s3 import S3Client


def build_bitstream_dict(bucket: str, file_type: str | None, prefix: str) -> dict:
def build_bitstream_dict(bucket: str, prefix: str) -> dict:
"""Build a dict of potential bitstreams with an item identifier for the key.
An underscore (if present) serves as the delimiter between the item identifier
and any additional suffixes in the case of multiple matching bitstreams.
Args:
bucket: The S3 bucket containing the potential bitstreams.
file_type: Optional parameter to filter bitstreams to specified file type.
prefix: The S3 prefix for the potential bitstreams.
"""
s3_client = S3Client()
bitstreams = list(
s3_client.files_iter(
bucket,
file_type=file_type if file_type else "",
prefix=prefix,
)
)
bitstream_dict: dict = {}
bitstreams = list(s3_client.files_iter(bucket=bucket, prefix=prefix))
bitstream_dict: dict[str, list[str]] = defaultdict(list)
for bitstream in bitstreams:
file_name = bitstream.split("/")[-1]
item_identifier = file_name.split("_")[0] if "_" in file_name else file_name
bitstream_dict.setdefault(item_identifier, []).append(bitstream)
bitstream_dict[item_identifier].append(bitstream)
return bitstream_dict


Expand Down
46 changes: 39 additions & 7 deletions dsc/workflows/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import json
import logging
from abc import ABC, abstractmethod
from importlib import import_module
from typing import TYPE_CHECKING, Any, final

from dsc.config import WORKFLOWS
from dsc.exceptions import (
InvalidDSpaceMetadataError,
InvalidWorkflowNameError,
ItemMetadatMissingRequiredFieldError,
)
from dsc.item_submission import ItemSubmission
from dsc.utilities import (
build_bitstream_dict,
match_bitstreams_to_item_identifiers,
match_item_identifiers_to_bitstreams,
)

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -84,12 +88,40 @@ def get_workflow(cls, workflow_name: str) -> type[BaseWorkflow]:
"""Return workflow class.
Args:
workflow_name: The label of the workflow. Must match a key from
config.WORKFLOWS.
workflow_name: The label of the workflow. Must match a workflow_name attribute
from BaseWorkflow subclass.
"""
for workflow_class in BaseWorkflow.__subclasses__():
if workflow_name == workflow_class.workflow_name:
return workflow_class
raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ")

def reconcile_bitstreams_and_metadata(self) -> tuple[set[str], set[str]]:
"""Reconcile bitstreams against metadata.
Generate a list of bitstreams without item identifiers and item identifiers
without bitstreams. Any discrepancies will be addressed by the engineer and
stakeholders as necessary.
"""
module_name, class_name = WORKFLOWS[workflow_name]["workflow-path"].rsplit(".", 1)
source_module = import_module(module_name)
return getattr(source_module, class_name)
bitstream_dict = build_bitstream_dict(self.s3_bucket, self.batch_path)

# extract item identifiers from batch metadata
item_identifiers = [
self.get_item_identifier(item_metadata)
for item_metadata in self.item_metadata_iter()
]

# reconcile item identifiers against bitstreams
item_identifier_matches = match_item_identifiers_to_bitstreams(
bitstream_dict.keys(), item_identifiers
)
file_matches = match_bitstreams_to_item_identifiers(
bitstream_dict.keys(), item_identifiers
)
logger.info(f"Item identifiers and bitstreams matched: {item_identifier_matches}")
no_bitstreams = set(item_identifiers) - set(item_identifier_matches)
no_item_identifiers = set(bitstream_dict.keys()) - set(file_matches)
return no_bitstreams, no_item_identifiers

@final
def run(self) -> Iterator[SendMessageResultTypeDef]:
Expand Down
2 changes: 2 additions & 0 deletions dsc/workflows/base/simple_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class SimpleCSV(BaseWorkflow):
deposit on S3.
"""

workflow_name: str = "simple_csv"

def item_metadata_iter(
self, metadata_file: str = "metadata.csv"
) -> Iterator[dict[str, Any]]:
Expand Down
45 changes: 45 additions & 0 deletions tests/test_base_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,56 @@

from dsc.exceptions import (
InvalidDSpaceMetadataError,
InvalidWorkflowNameError,
ItemMetadatMissingRequiredFieldError,
)
from dsc.item_submission import ItemSubmission


def test_base_workflow_load_success(base_workflow_instance):
workflow_instance = base_workflow_instance.load(
workflow_name="test",
collection_handle="123.4/5678",
batch_id="batch-aaa",
)
assert workflow_instance.workflow_name == "test"
assert workflow_instance.submission_system == "Test@MIT"
assert workflow_instance.email_recipients == ("test@test.test",)
assert (
workflow_instance.metadata_mapping_path
== "tests/fixtures/test_metadata_mapping.json"
)
assert workflow_instance.s3_bucket == "dsc"
assert workflow_instance.output_queue == "mock-output_queue"
assert workflow_instance.collection_handle == "123.4/5678"
assert workflow_instance.batch_id == "batch-aaa"


def test_base_workflow_get_workflow_success(base_workflow_instance):
workflow_class = base_workflow_instance.get_workflow("test")
assert workflow_class.workflow_name == "test"


def test_base_workflow_get_workflow_invalid_workflow_name_raises_error(
base_workflow_instance,
):
with pytest.raises(InvalidWorkflowNameError):
base_workflow_instance.get_workflow("tast")


def test_base_workflow_reconcile_bitstreams_and_metadata_success(
caplog, base_workflow_instance, mocked_s3, s3_client
):
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf")
assert base_workflow_instance.reconcile_bitstreams_and_metadata() == (
{"789"},
{"456"},
)
assert "Item identifiers and bitstreams matched: ['123']" in caplog.text


def test_base_workflow_run_success(
caplog, base_workflow_instance, mocked_s3, mocked_sqs_input, mocked_sqs_output
):
Expand Down
56 changes: 9 additions & 47 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,25 @@
from dsc.cli import main


def test_reconcile_with_file_type_success(
caplog, runner, mocked_s3, base_workflow_instance, s3_client
):
def test_reconcile_success(caplog, runner, mocked_s3, base_workflow_instance, s3_client):
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg")
result = runner.invoke(
main,
[
"--workflow_name",
"--workflow-name",
"test",
"--batch_id",
"batch-aaa",
"--collection_handle",
"--collection-handle",
"123.4/5678",
"reconcile",
"--file_type",
"pdf",
],
)
assert result.exit_code == 0
assert "Item identifiers and bitstreams successfully matched: ['123']" in caplog.text
assert (
"No bitstreams found for the following item identifiers: {'789'}" in caplog.text
)
assert (
"No item identifiers found for the following bitstreams: {'456'}" in caplog.text
)
assert "Total time elapsed" in caplog.text


def test_reconcile_without_file_type_success(
caplog, runner, mocked_s3, base_workflow_instance, s3_client
):
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg")
result = runner.invoke(
main,
[
"--workflow_name",
"test",
"--batch_id",
"--batch-id",
"batch-aaa",
"--collection_handle",
"123.4/5678",
"reconcile",
],
)
assert result.output == ""
assert result.exit_code == 0
assert (
"Item identifiers and bitstreams successfully matched: ['123', '789']"
in caplog.text
)
assert (
"No item identifiers found for the following bitstreams: {'456'}" in caplog.text
)
assert "Item identifiers and bitstreams matched: ['123']" in caplog.text
assert "No bitstreams found for these item identifiers: {'789'}" in caplog.text
assert "No item identifiers found for these bitstreams: {'456'}" in caplog.text
assert "Total time elapsed" in caplog.text
17 changes: 2 additions & 15 deletions tests/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,12 @@
)


def test_build_bitstream_dict_with_file_type_success(mocked_s3, s3_client):
def test_build_bitstream_dict_success(mocked_s3, s3_client):
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg")
assert build_bitstream_dict(
bucket="dsc", file_type="pdf", prefix="test/batch-aaa/"
) == {
"123": ["test/batch-aaa/123_01.pdf", "test/batch-aaa/123_02.pdf"],
"456": ["test/batch-aaa/456_01.pdf"],
}


def test_build_bitstream_dict_without_file_type_success(mocked_s3, s3_client):
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf")
s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg")
assert build_bitstream_dict(bucket="dsc", file_type="", prefix="test/batch-aaa/") == {
assert build_bitstream_dict(bucket="dsc", prefix="test/batch-aaa/") == {
"123": ["test/batch-aaa/123_01.pdf", "test/batch-aaa/123_02.pdf"],
"456": ["test/batch-aaa/456_01.pdf"],
"789": ["test/batch-aaa/789_01.jpg"],
Expand Down

0 comments on commit 0c860b0

Please sign in to comment.