diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 018a66e..af68543 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -2,14 +2,19 @@ import pymongo from ._version import get_versions -__version__ = get_versions()['version'] +__version__ = get_versions()["version"] del get_versions class Serializer(event_model.DocumentRouter): - def __init__(self, metadatastore_db, asset_registry_db, - ignore_duplicates=True, resource_uid_unique=False, - tls=False): + def __init__( + self, + metadatastore_db, + asset_registry_db, + ignore_duplicates=True, + resource_uid_unique=False, + tls=False, + ): """ Insert documents into MongoDB using layout v1. @@ -48,22 +53,31 @@ def __init__(self, metadatastore_db, asset_registry_db, assets_db = _get_database(asset_registry_db, tls) else: assets_db = asset_registry_db - self._run_start_collection = mds_db.get_collection('run_start') - self._run_start_collection_revisions = mds_db.get_collection('run_start_revisions') - self._run_stop_collection = mds_db.get_collection('run_stop') - self._event_descriptor_collection = mds_db.get_collection( - 'event_descriptor') - self._event_collection = mds_db.get_collection('event') - - self._resource_collection = assets_db.get_collection('resource') - self._datum_collection = assets_db.get_collection('datum') - - self._collections = {'start': self._run_start_collection, - 'stop': self._run_stop_collection, - 'resource': self._resource_collection, - 'descriptor': self._event_descriptor_collection, - 'event': self._event_collection, - 'datum': self._datum_collection} + self._run_start_collection = mds_db.get_collection("run_start") + self._run_start_collection_revisions = mds_db.get_collection( + "run_start_revisions" + ) + self._run_stop_collection = mds_db.get_collection("run_stop") + self._run_stop_collection_revisions = mds_db.get_collection( + "run_stop_revisions" + ) + self._event_descriptor_collection_revisions = mds_db.get_collection( + "event_descriptor_revisions" + ) + self._event_descriptor_collection = mds_db.get_collection("event_descriptor") + self._event_collection = mds_db.get_collection("event") + + self._resource_collection = assets_db.get_collection("resource") + self._datum_collection = assets_db.get_collection("datum") + + self._collections = { + "start": self._run_start_collection, + "stop": self._run_stop_collection, + "resource": self._resource_collection, + "descriptor": self._event_descriptor_collection, + "event": self._event_collection, + "datum": self._datum_collection, + } self._metadatastore_db = mds_db self._asset_registry_db = assets_db @@ -77,47 +91,54 @@ def _create_indexes(self): If the index already exists, this has no effect. """ - self._resource_collection.create_index( - 'uid', unique=self._resource_uid_unique) - self._resource_collection.create_index('resource_id') # legacy + self._resource_collection.create_index("uid", unique=self._resource_uid_unique) + self._resource_collection.create_index("resource_id") # legacy # TODO: Migrate all Resources to have a RunStart UID, and then make a # unique index on: # [('uid', pymongo.ASCENDING), ('run_start', pymongo.ASCENDING)] - self._datum_collection.create_index('datum_id', unique=True) - self._datum_collection.create_index('resource') - self._run_start_collection.create_index('uid', unique=True) - self._run_start_collection.create_index('scan_id', unique=False) + self._datum_collection.create_index("datum_id", unique=True) + self._datum_collection.create_index("resource") + self._run_start_collection.create_index("uid", unique=True) + self._run_start_collection.create_index("scan_id", unique=False) self._run_start_collection.create_index( - [('scan_id', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)], - unique=True) + [("scan_id", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True + ) self._run_start_collection.create_index( - [('time', pymongo.ASCENDING), ('_id', pymongo.DESCENDING)], - unique=True) + [("time", pymongo.ASCENDING), ("_id", pymongo.DESCENDING)], unique=True + ) self._run_start_collection.create_index( - [('time', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)], - unique=True) + [("time", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True + ) self._run_start_collection.create_index( - [('time', pymongo.DESCENDING), ('scan_id', pymongo.DESCENDING)], - unique=False, background=True) + [("time", pymongo.DESCENDING), ("scan_id", pymongo.DESCENDING)], + unique=False, + background=True, + ) self._run_start_collection.create_index([("$**", "text")]) - self._run_start_collection.create_index('data_session', unique=False) - self._run_start_collection.create_index('data_groups', unique=False) - self._run_stop_collection.create_index('run_start', unique=True) - self._run_stop_collection.create_index('uid', unique=True) + self._run_start_collection.create_index("data_session", unique=False) + self._run_start_collection.create_index("data_groups", unique=False) + self._run_stop_collection.create_index("run_start", unique=True) + self._run_stop_collection.create_index("uid", unique=True) self._run_stop_collection.create_index( - [('time', pymongo.DESCENDING)], unique=False, background=True) + [("time", pymongo.DESCENDING)], unique=False, background=True + ) self._run_stop_collection.create_index([("$**", "text")]) - self._event_descriptor_collection.create_index('uid', unique=True) + self._event_descriptor_collection.create_index("uid", unique=True) self._event_descriptor_collection.create_index( - [('run_start', pymongo.DESCENDING), ('time', pymongo.DESCENDING)], - unique=False, background=True) + [("run_start", pymongo.DESCENDING), ("time", pymongo.DESCENDING)], + unique=False, + background=True, + ) self._event_descriptor_collection.create_index( - [('time', pymongo.DESCENDING)], unique=False, background=True) + [("time", pymongo.DESCENDING)], unique=False, background=True + ) self._event_descriptor_collection.create_index([("$**", "text")]) - self._event_collection.create_index('uid', unique=True) + self._event_collection.create_index("uid", unique=True) self._event_collection.create_index( - [('descriptor', pymongo.DESCENDING), ('time', pymongo.ASCENDING)], - unique=False, background=True) + [("descriptor", pymongo.DESCENDING), ("time", pymongo.ASCENDING)], + unique=False, + background=True, + ) def __call__(self, name, doc): # Before inserting into mongo, convert any numpy objects into built-in @@ -135,12 +156,14 @@ def _insert(self, name, doc): f"already exists in the database. Document:\n{doc}" ) from err else: - doc.pop('_id') + doc.pop("_id") if name == "datum": id_name = "datum_id" else: id_name = "uid" - existing = self._collections[name].find_one({id_name: doc[id_name]}, {'_id': False}) + existing = self._collections[name].find_one( + {id_name: doc[id_name]}, {"_id": False} + ) if existing != doc: raise DuplicateUniqueID( "A document with the same unique id as this one " @@ -159,39 +182,55 @@ def update(self, name, doc): Parameters ---------- - name: {'start'} - The type of document being updated. Currently, only 'start' is - supported and any other value here will raise NotImplementedError. + name: {'start', 'stop', 'descriptor'} + The type of document being updated. doc: dict The new version of the document. Its uid will be used to match it to the current version, the one to be updated. """ - if name == 'start': - event_model.schema_validators[event_model.DocumentNames.start].validate(doc) - current_col = self._run_start_collection - revisions_col = self._run_start_collection_revisions - old = current_col.find_one({'uid': doc['uid']}) - old.pop('_id') - target_uid_docs = revisions_col.find({'document.uid': doc['uid']}) - cur = target_uid_docs.sort([('revision', pymongo.DESCENDING)]).limit(1) - wrapped = dict() - try: - wrapped['revision'] = next(cur)['revision'] + 1 - except StopIteration: - wrapped['revision'] = 0 - wrapped['document'] = old - revisions_col.insert_one(wrapped) - current_col.find_one_and_replace({'uid': doc['uid']}, doc) + if name in {"start", "stop", "descriptor"}: + event_model.schema_validators[ + getattr(event_model.DocumentNames, name) + ].validate(doc) + # Keys and collection names differ slightly between start, stop and descriptor + key = "uid" + name = f"_event_{name}" if name == "descriptor" else f"_run_{name}" + current_col = getattr(self, f"{name}_collection") + revisions_col = getattr(self, f"{name}_collection_revisions") + old = current_col.find_one({key: doc[key]}) + if old is None and (name == "_run_stop" or name == "_event_descriptor"): + # New stop or descriptor document : insert it + current_col.insert_one(doc) + else: + old.pop("_id") + # Field Saftey Enforcement : Prevent restricted fields from changing + restricted_fields = ["run_start"] + for field in restricted_fields: + if field in old and field in doc: + if old[field] != doc[field]: + raise ValueError( + f"Field '{field}' is restricted and cannot be changed." + ) + target_uid_docs = revisions_col.find({"document.uid": doc["uid"]}) + cur = target_uid_docs.sort([("revision", pymongo.DESCENDING)]).limit(1) + wrapped = dict() + try: + wrapped["revision"] = next(cur)["revision"] + 1 + except StopIteration: + wrapped["revision"] = 0 + wrapped["document"] = old + revisions_col.insert_one(wrapped) + current_col.find_one_and_replace({"uid": doc["uid"]}, doc) else: raise NotImplementedError( f"Updating a {name} document is not currently supported. " - f"Only updates to 'start' documents are supported.") + ) def start(self, doc): - self._insert('start', doc) + self._insert("start", doc) def descriptor(self, doc): - self._insert('descriptor', doc) + self._insert("descriptor", doc) def resource(self, doc): # In old databases, we know there are duplicates Resources. Until we @@ -202,9 +241,11 @@ def resource(self, doc): # is slow, but since there are never a large number of Resources per # Run, this is acceptable. if self._resource_uid_unique: - self._insert('resource', doc) + self._insert("resource", doc) else: - existing = self._collections["resource"].find_one({'uid': doc['uid']}, {'_id': False}) + existing = self._collections["resource"].find_one( + {"uid": doc["uid"]}, {"_id": False} + ) if existing is not None: if existing != doc: raise DuplicateUniqueID( @@ -216,7 +257,7 @@ def resource(self, doc): self._collections["resource"].insert_one(doc) def event(self, doc): - self._insert('event', doc) + self._insert("event", doc) def event_page(self, doc): # Unpack an EventPage into Events and do the actual insert inside @@ -230,7 +271,7 @@ def event_page(self, doc): filled_events.append(event_method(event_doc)) def datum(self, doc): - self._insert('datum', doc) + self._insert("datum", doc) def datum_page(self, doc): # Unpack an DatumPage into Datum and do the actual insert inside @@ -244,20 +285,22 @@ def datum_page(self, doc): filled_datums.append(datum_method(datum_doc)) def stop(self, doc): - self._insert('stop', doc) + self._insert("stop", doc) def __repr__(self): # Display connection info in eval-able repr. - return (f'{type(self).__name__}(' - f'metadatastore_db={self._metadatastore_db!r}, ' - f'asset_registry_db={self._asset_registry_db!r})') + return ( + f"{type(self).__name__}(" + f"metadatastore_db={self._metadatastore_db!r}, " + f"asset_registry_db={self._asset_registry_db!r})" + ) def _get_database(uri, tls): - if not pymongo.uri_parser.parse_uri(uri)['database']: + if not pymongo.uri_parser.parse_uri(uri)["database"]: raise ValueError( - f"Invalid URI: {uri} " - f"Did you forget to include a database?") + f"Invalid URI: {uri} " f"Did you forget to include a database?" + ) else: client = pymongo.MongoClient(uri, tls=tls) return client.get_database() diff --git a/suitcase/mongo_normalized/tests/tests.py b/suitcase/mongo_normalized/tests/tests.py index d72612b..31a85eb 100644 --- a/suitcase/mongo_normalized/tests/tests.py +++ b/suitcase/mongo_normalized/tests/tests.py @@ -30,7 +30,7 @@ def test_duplicates(db_factory, example_data): # Modify a document, check that inserting a document with uid, # but different content raises. - documents[0][1]['new_key'] = 'new_value' + documents[0][1]["new_key"] = "new_value" with pytest.raises(DuplicateUniqueID): for item in documents: serializer(*item) @@ -43,31 +43,95 @@ def test_update(db_factory, example_data): serializer = Serializer(metadatastore_db, asset_registry_db) for item in documents: serializer(*item) - original = documents[0][1] - start = copy.deepcopy(original) - start['user'] = 'first updated temp user' - serializer.update('start', start) - real = metadatastore_db.get_collection('run_start').find_one({'uid': start['uid']}) - real.pop('_id') - assert sanitize_doc(real) == sanitize_doc(start) - revision = metadatastore_db.get_collection('run_start_revisions').find_one({'document.uid': start['uid']}) - assert revision['revision'] == 0 - revision.pop('revision') - revision.pop('_id') - assert sanitize_doc(revision['document']) == sanitize_doc(original) - - revision1 = copy.deepcopy(start) - start['user'] = 'second updated temp user' - serializer.update('start', start) - real = metadatastore_db.get_collection('run_start').find_one({'uid': start['uid']}) - real.pop('_id') - assert sanitize_doc(real) == sanitize_doc(start) - revision = metadatastore_db.get_collection('run_start_revisions').find_one({'document.uid': start['uid'], - 'revision': 1}) - assert revision['revision'] == 1 - revision.pop('revision') - revision.pop('_id') - assert sanitize_doc(revision['document']) == sanitize_doc(revision1) + original_start = next(item[1] for item in documents if item[0] == "start") + original_stop = next(item[1] for item in documents if item[0] == "stop") + original_descriptor = next(item[1] for item in documents if item[0] == "descriptor") + # (1) Make mutable copies + start = copy.deepcopy(original_start) + stop = copy.deepcopy(original_stop) + descriptor = copy.deepcopy(original_descriptor) + # (2) Update a property of the copies + start["user"] = "first updated temp user" + serializer.update("start", start) + stop["reason"] = "Everything happens for a reason." + serializer.update("stop", stop) + descriptor["name"] = "secondary" + serializer.update("descriptor", descriptor) + # (3) Get the updated record from the database to confirm changes + real_start = metadatastore_db.get_collection("run_start").find_one( + {"uid": start["uid"]} + ) + real_start.pop("_id") + real_stop = metadatastore_db.get_collection("run_stop").find_one( + {"uid": stop["uid"]} + ) + real_stop.pop("_id") + real_descriptor = metadatastore_db.get_collection("event_descriptor").find_one( + {"uid": descriptor["uid"]} + ) + real_descriptor.pop("_id") + # (4) Test the data + assert sanitize_doc(real_start) == sanitize_doc(start) + assert sanitize_doc(real_stop) == sanitize_doc(stop) + assert sanitize_doc(real_descriptor) == sanitize_doc(descriptor) + # (5) Test the revisions + revision_start = metadatastore_db.get_collection("run_start_revisions").find_one( + {"document.uid": start["uid"]} + ) + assert revision_start["revision"] == 0 + revision_start.pop("revision") + revision_start.pop("_id") + assert sanitize_doc(revision_start["document"]) == sanitize_doc(original_start) + + revision_stop = metadatastore_db.get_collection("run_stop_revisions").find_one( + {"document.uid": stop["uid"]} + ) + assert revision_stop["revision"] == 0 + revision_stop.pop("revision") + revision_stop.pop("_id") + assert sanitize_doc(revision_stop["document"]) == sanitize_doc(original_stop) + + revision_descriptor = metadatastore_db.get_collection( + "event_descriptor_revisions" + ).find_one({"document.uid": descriptor["uid"]}) + assert revision_descriptor["revision"] == 0 + revision_descriptor.pop("revision") + revision_descriptor.pop("_id") + assert sanitize_doc(revision_descriptor["document"]) == sanitize_doc( + original_descriptor + ) + + # (6) Test another revision + revision1_start = copy.deepcopy(start) + revision1_stop = copy.deepcopy(stop) + start["user"] = "second updated temp user" + serializer.update("start", start) + stop["reason"] = "Nothing happens for a reason." + serializer.update("stop", stop) + real_start = metadatastore_db.get_collection("run_start").find_one( + {"uid": start["uid"]} + ) + real_start.pop("_id") + assert sanitize_doc(real_start) == sanitize_doc(start) + real_stop = metadatastore_db.get_collection("run_stop").find_one( + {"uid": stop["uid"]} + ) + real_stop.pop("_id") + assert sanitize_doc(real_stop) == sanitize_doc(stop) + revision_start = metadatastore_db.get_collection("run_start_revisions").find_one( + {"document.uid": start["uid"], "revision": 1} + ) + assert revision_start["revision"] == 1 + revision_start.pop("revision") + revision_start.pop("_id") + revision_stop = metadatastore_db.get_collection("run_stop_revisions").find_one( + {"document.uid": stop["uid"], "revision": 1} + ) + assert revision_stop["revision"] == 1 + revision_stop.pop("revision") + revision_stop.pop("_id") + assert sanitize_doc(revision_start["document"]) == sanitize_doc(revision1_start) + assert sanitize_doc(revision_stop["document"]) == sanitize_doc(revision1_stop) def test_notimplemented_error(db_factory, example_data): @@ -75,7 +139,7 @@ def test_notimplemented_error(db_factory, example_data): asset_registry_db = db_factory() serializer = Serializer(metadatastore_db, asset_registry_db) with pytest.raises(NotImplementedError): - assert serializer.update('not_start', {}) + assert serializer.update("not_start", {}) def test_validation_error(db_factory, example_data): @@ -83,7 +147,7 @@ def test_validation_error(db_factory, example_data): asset_registry_db = db_factory() serializer = Serializer(metadatastore_db, asset_registry_db) with pytest.raises(ValidationError): - assert serializer.update('start', {}) + assert serializer.update("start", {}) def test_index_creation(db_factory): @@ -95,40 +159,40 @@ def test_index_creation(db_factory): indexes = asset_registry_db.resource.index_information() assert len(indexes.keys()) == 3 - assert not indexes['uid_1'].get('unique') - assert indexes['resource_id_1'] + assert not indexes["uid_1"].get("unique") + assert indexes["resource_id_1"] indexes = asset_registry_db.datum.index_information() assert len(indexes.keys()) == 3 - assert indexes['datum_id_1']['unique'] - assert indexes['resource_1'] + assert indexes["datum_id_1"]["unique"] + assert indexes["resource_1"] indexes = metadatastore_db.run_start.index_information() assert len(indexes.keys()) == 6 - assert indexes['uid_1']['unique'] - assert indexes['time_-1_scan_id_-1'] - assert indexes['$**_text'] - assert indexes['data_session_1'] - assert indexes['data_groups_1'] + assert indexes["uid_1"]["unique"] + assert indexes["time_-1_scan_id_-1"] + assert indexes["$**_text"] + assert indexes["data_session_1"] + assert indexes["data_groups_1"] indexes = metadatastore_db.run_stop.index_information() assert len(indexes.keys()) == 5 - assert indexes['uid_1']['unique'] - assert indexes['run_start_1']['unique'] - assert indexes['time_-1'] - assert indexes['$**_text'] + assert indexes["uid_1"]["unique"] + assert indexes["run_start_1"]["unique"] + assert indexes["time_-1"] + assert indexes["$**_text"] indexes = metadatastore_db.event_descriptor.index_information() assert len(indexes.keys()) == 5 - assert indexes['uid_1']['unique'] - assert indexes['run_start_-1_time_-1'] - assert indexes['time_-1'] - assert indexes['$**_text'] + assert indexes["uid_1"]["unique"] + assert indexes["run_start_-1_time_-1"] + assert indexes["time_-1"] + assert indexes["$**_text"] indexes = metadatastore_db.event.index_information() assert len(indexes.keys()) == 3 - assert indexes['uid_1']['unique'] - assert indexes['descriptor_-1_time_1'] + assert indexes["uid_1"]["unique"] + assert indexes["descriptor_-1_time_1"] def test_resource_uid_unique(db_factory): @@ -139,4 +203,4 @@ def test_resource_uid_unique(db_factory): Serializer(metadatastore_db, asset_registry_db, resource_uid_unique=True) indexes = asset_registry_db.resource.index_information() - assert indexes['uid_1'].get('unique') + assert indexes["uid_1"].get("unique")