From 0c860b00cfba398e7ea8888c25cb2dc84a3bf23a Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Tue, 7 Jan 2025 13:19:37 -0500 Subject: [PATCH] Updates based on discussion in #56 * Update formatting of click options * Shift reconcile functionality to BaseWorkflow.reconcile_bitstreams_and_metadata method and add corresponding unit tests * Add unit tests for BaseWorkflow's load and get_workflow methods * Refactor get_workflow method to use subclasses * Remove WORKFLOWS constant from config.py * Add InvalidWorkflowNameError exception * Replace setdefault call with defaultdict in build_bitstream_dict function * Remove file type filtering from build_bitstream_dict function and remove related unit and CLI tests --- dsc/cli.py | 60 +++++++------------------------- dsc/config.py | 6 ---- dsc/exceptions.py | 4 +++ dsc/utilities/__init__.py | 16 +++------ dsc/workflows/base/__init__.py | 46 ++++++++++++++++++++---- dsc/workflows/base/simple_csv.py | 2 ++ tests/test_base_workflow.py | 45 ++++++++++++++++++++++++ tests/test_cli.py | 56 +++++------------------------ tests/test_utilities.py | 17 ++------- 9 files changed, 119 insertions(+), 133 deletions(-) diff --git a/dsc/cli.py b/dsc/cli.py index 4b0be59..1126e7b 100644 --- a/dsc/cli.py +++ b/dsc/cli.py @@ -6,11 +6,6 @@ import click from dsc.config import Config -from dsc.utilities import ( - build_bitstream_dict, - match_bitstreams_to_item_identifiers, - match_item_identifiers_to_bitstreams, -) from dsc.workflows.base import BaseWorkflow logger = logging.getLogger(__name__) @@ -21,20 +16,20 @@ @click.pass_context @click.option( "-w", - "--workflow_name", + "--workflow-name", help="The workflow to use for the batch of DSpace submissions", required=True, ) @click.option( - "-b", - "--batch_id", - help="The S3 prefix for the batch of DSpace submissions", + "-c", + "--collection-handle", + help="The handle of the DSpace collection to which the batch will be submitted", required=True, ) @click.option( - "-c", - "--collection_handle", - help="The handle of the DSpace collection to which the batch will be submitted", + "-b", + "--batch-id", + help="The S3 prefix for the batch of DSpace submissions", required=True, ) @click.option( @@ -80,43 +75,14 @@ def post_main_group_subcommand( @main.command() @click.pass_context -@click.option( - "-f", - "--file_type", - help="Optional parameter to filter bitstreams to specified file type", -) -def reconcile(ctx: click.Context, file_type: str) -> None: - """Match files in the S3 directory with entries in the batch metadata.""" +def reconcile(ctx: click.Context) -> None: + """Reconcile bitstreams with item identifiers from the metadata.""" workflow = ctx.obj["workflow"] + no_bitstreams, no_item_identifiers = workflow.reconcile_bitstreams_and_metadata() - bitstream_dict = build_bitstream_dict( - workflow.s3_bucket, file_type, workflow.batch_path - ) - - # extract item identifiers from batch metadata - item_identifiers = [ - workflow.get_item_identifier(item_metadata) - for item_metadata in workflow.item_metadata_iter() - ] - - # reconcile item identifiers against S3 files - item_identifier_matches = match_item_identifiers_to_bitstreams( - bitstream_dict.keys(), item_identifiers - ) - file_matches = match_bitstreams_to_item_identifiers( - bitstream_dict.keys(), item_identifiers - ) - no_bitstreams = set(item_identifiers) - set(item_identifier_matches) - no_identifiers = set(bitstream_dict.keys()) - set(file_matches) - - logger.info( - f"Item identifiers and bitstreams successfully matched: {item_identifier_matches}" - ) if no_bitstreams: + logger.error(f"No bitstreams found for these item identifiers: {no_bitstreams}") + if no_item_identifiers: logger.error( - f"No bitstreams found for the following item identifiers: {no_bitstreams}" - ) - if no_identifiers: - logger.error( - f"No item identifiers found for the following bitstreams: {no_identifiers}" + f"No item identifiers found for these bitstreams: {no_item_identifiers}" ) diff --git a/dsc/config.py b/dsc/config.py index 3c1208a..eb11bdd 100644 --- a/dsc/config.py +++ b/dsc/config.py @@ -5,12 +5,6 @@ import sentry_sdk -WORKFLOWS = { - "test": { - "workflow-path": "tests.conftest.TestBaseWorkflow", - } -} - class Config: REQUIRED_ENV_VARS: Iterable[str] = [ diff --git a/dsc/exceptions.py b/dsc/exceptions.py index caed71d..f5f8ee6 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -6,5 +6,9 @@ class InvalidSQSMessageError(Exception): pass +class InvalidWorkflowNameError(Exception): + pass + + class ItemMetadatMissingRequiredFieldError(Exception): pass diff --git a/dsc/utilities/__init__.py b/dsc/utilities/__init__.py index 71d2163..dbf4d83 100644 --- a/dsc/utilities/__init__.py +++ b/dsc/utilities/__init__.py @@ -1,9 +1,10 @@ from _collections_abc import dict_keys +from collections import defaultdict from dsc.utilities.aws.s3 import S3Client -def build_bitstream_dict(bucket: str, file_type: str | None, prefix: str) -> dict: +def build_bitstream_dict(bucket: str, prefix: str) -> dict: """Build a dict of potential bitstreams with an item identifier for the key. An underscore (if present) serves as the delimiter between the item identifier @@ -11,22 +12,15 @@ def build_bitstream_dict(bucket: str, file_type: str | None, prefix: str) -> dic Args: bucket: The S3 bucket containing the potential bitstreams. - file_type: Optional parameter to filter bitstreams to specified file type. prefix: The S3 prefix for the potential bitstreams. """ s3_client = S3Client() - bitstreams = list( - s3_client.files_iter( - bucket, - file_type=file_type if file_type else "", - prefix=prefix, - ) - ) - bitstream_dict: dict = {} + bitstreams = list(s3_client.files_iter(bucket=bucket, prefix=prefix)) + bitstream_dict: dict[str, list[str]] = defaultdict(list) for bitstream in bitstreams: file_name = bitstream.split("/")[-1] item_identifier = file_name.split("_")[0] if "_" in file_name else file_name - bitstream_dict.setdefault(item_identifier, []).append(bitstream) + bitstream_dict[item_identifier].append(bitstream) return bitstream_dict diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index ba6e511..8de5b96 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -3,15 +3,19 @@ import json import logging from abc import ABC, abstractmethod -from importlib import import_module from typing import TYPE_CHECKING, Any, final -from dsc.config import WORKFLOWS from dsc.exceptions import ( InvalidDSpaceMetadataError, + InvalidWorkflowNameError, ItemMetadatMissingRequiredFieldError, ) from dsc.item_submission import ItemSubmission +from dsc.utilities import ( + build_bitstream_dict, + match_bitstreams_to_item_identifiers, + match_item_identifiers_to_bitstreams, +) if TYPE_CHECKING: from collections.abc import Iterator @@ -84,12 +88,40 @@ def get_workflow(cls, workflow_name: str) -> type[BaseWorkflow]: """Return workflow class. Args: - workflow_name: The label of the workflow. Must match a key from - config.WORKFLOWS. + workflow_name: The label of the workflow. Must match a workflow_name attribute + from BaseWorkflow subclass. + """ + for workflow_class in BaseWorkflow.__subclasses__(): + if workflow_name == workflow_class.workflow_name: + return workflow_class + raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ") + + def reconcile_bitstreams_and_metadata(self) -> tuple[set[str], set[str]]: + """Reconcile bitstreams against metadata. + + Generate a list of bitstreams without item identifiers and item identifiers + without bitstreams. Any discrepancies will be addressed by the engineer and + stakeholders as necessary. """ - module_name, class_name = WORKFLOWS[workflow_name]["workflow-path"].rsplit(".", 1) - source_module = import_module(module_name) - return getattr(source_module, class_name) + bitstream_dict = build_bitstream_dict(self.s3_bucket, self.batch_path) + + # extract item identifiers from batch metadata + item_identifiers = [ + self.get_item_identifier(item_metadata) + for item_metadata in self.item_metadata_iter() + ] + + # reconcile item identifiers against bitstreams + item_identifier_matches = match_item_identifiers_to_bitstreams( + bitstream_dict.keys(), item_identifiers + ) + file_matches = match_bitstreams_to_item_identifiers( + bitstream_dict.keys(), item_identifiers + ) + logger.info(f"Item identifiers and bitstreams matched: {item_identifier_matches}") + no_bitstreams = set(item_identifiers) - set(item_identifier_matches) + no_item_identifiers = set(bitstream_dict.keys()) - set(file_matches) + return no_bitstreams, no_item_identifiers @final def run(self) -> Iterator[SendMessageResultTypeDef]: diff --git a/dsc/workflows/base/simple_csv.py b/dsc/workflows/base/simple_csv.py index eb0dad2..4f9f0d8 100644 --- a/dsc/workflows/base/simple_csv.py +++ b/dsc/workflows/base/simple_csv.py @@ -18,6 +18,8 @@ class SimpleCSV(BaseWorkflow): deposit on S3. """ + workflow_name: str = "simple_csv" + def item_metadata_iter( self, metadata_file: str = "metadata.csv" ) -> Iterator[dict[str, Any]]: diff --git a/tests/test_base_workflow.py b/tests/test_base_workflow.py index f3eaf56..c1675e2 100644 --- a/tests/test_base_workflow.py +++ b/tests/test_base_workflow.py @@ -4,11 +4,56 @@ from dsc.exceptions import ( InvalidDSpaceMetadataError, + InvalidWorkflowNameError, ItemMetadatMissingRequiredFieldError, ) from dsc.item_submission import ItemSubmission +def test_base_workflow_load_success(base_workflow_instance): + workflow_instance = base_workflow_instance.load( + workflow_name="test", + collection_handle="123.4/5678", + batch_id="batch-aaa", + ) + assert workflow_instance.workflow_name == "test" + assert workflow_instance.submission_system == "Test@MIT" + assert workflow_instance.email_recipients == ("test@test.test",) + assert ( + workflow_instance.metadata_mapping_path + == "tests/fixtures/test_metadata_mapping.json" + ) + assert workflow_instance.s3_bucket == "dsc" + assert workflow_instance.output_queue == "mock-output_queue" + assert workflow_instance.collection_handle == "123.4/5678" + assert workflow_instance.batch_id == "batch-aaa" + + +def test_base_workflow_get_workflow_success(base_workflow_instance): + workflow_class = base_workflow_instance.get_workflow("test") + assert workflow_class.workflow_name == "test" + + +def test_base_workflow_get_workflow_invalid_workflow_name_raises_error( + base_workflow_instance, +): + with pytest.raises(InvalidWorkflowNameError): + base_workflow_instance.get_workflow("tast") + + +def test_base_workflow_reconcile_bitstreams_and_metadata_success( + caplog, base_workflow_instance, mocked_s3, s3_client +): + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") + assert base_workflow_instance.reconcile_bitstreams_and_metadata() == ( + {"789"}, + {"456"}, + ) + assert "Item identifiers and bitstreams matched: ['123']" in caplog.text + + def test_base_workflow_run_success( caplog, base_workflow_instance, mocked_s3, mocked_sqs_input, mocked_sqs_output ): diff --git a/tests/test_cli.py b/tests/test_cli.py index cf11c08..4aab1cf 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,63 +1,25 @@ from dsc.cli import main -def test_reconcile_with_file_type_success( - caplog, runner, mocked_s3, base_workflow_instance, s3_client -): +def test_reconcile_success(caplog, runner, mocked_s3, base_workflow_instance, s3_client): s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg") result = runner.invoke( main, [ - "--workflow_name", + "--workflow-name", "test", - "--batch_id", - "batch-aaa", - "--collection_handle", + "--collection-handle", "123.4/5678", - "reconcile", - "--file_type", - "pdf", - ], - ) - assert result.exit_code == 0 - assert "Item identifiers and bitstreams successfully matched: ['123']" in caplog.text - assert ( - "No bitstreams found for the following item identifiers: {'789'}" in caplog.text - ) - assert ( - "No item identifiers found for the following bitstreams: {'456'}" in caplog.text - ) - assert "Total time elapsed" in caplog.text - - -def test_reconcile_without_file_type_success( - caplog, runner, mocked_s3, base_workflow_instance, s3_client -): - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg") - result = runner.invoke( - main, - [ - "--workflow_name", - "test", - "--batch_id", + "--batch-id", "batch-aaa", - "--collection_handle", - "123.4/5678", "reconcile", ], ) + assert result.output == "" assert result.exit_code == 0 - assert ( - "Item identifiers and bitstreams successfully matched: ['123', '789']" - in caplog.text - ) - assert ( - "No item identifiers found for the following bitstreams: {'456'}" in caplog.text - ) + assert "Item identifiers and bitstreams matched: ['123']" in caplog.text + assert "No bitstreams found for these item identifiers: {'789'}" in caplog.text + assert "No item identifiers found for these bitstreams: {'456'}" in caplog.text assert "Total time elapsed" in caplog.text diff --git a/tests/test_utilities.py b/tests/test_utilities.py index d9c90b9..29fad72 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -5,25 +5,12 @@ ) -def test_build_bitstream_dict_with_file_type_success(mocked_s3, s3_client): +def test_build_bitstream_dict_success(mocked_s3, s3_client): s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg") - assert build_bitstream_dict( - bucket="dsc", file_type="pdf", prefix="test/batch-aaa/" - ) == { - "123": ["test/batch-aaa/123_01.pdf", "test/batch-aaa/123_02.pdf"], - "456": ["test/batch-aaa/456_01.pdf"], - } - - -def test_build_bitstream_dict_without_file_type_success(mocked_s3, s3_client): - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") - s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.jpg") - assert build_bitstream_dict(bucket="dsc", file_type="", prefix="test/batch-aaa/") == { + assert build_bitstream_dict(bucket="dsc", prefix="test/batch-aaa/") == { "123": ["test/batch-aaa/123_01.pdf", "test/batch-aaa/123_02.pdf"], "456": ["test/batch-aaa/456_01.pdf"], "789": ["test/batch-aaa/789_01.jpg"],