Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for updating stop and descriptor documents #50

Merged
merged 20 commits into from
Feb 5, 2024
Merged
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
19043c0
Support updating `stop` documents, inserting when needed 🫷
Kezzsim Feb 1, 2024
ea608e7
Reformat into single, more versitile condition 📦️
Kezzsim Feb 1, 2024
8737d3a
Add EXPERIMENTAL support for updating `BlueSkyEventStream` descriptors 🧪
Kezzsim Feb 1, 2024
a522a34
Linter formatting after adding hypothetical EventStream descriptor su…
Kezzsim Feb 1, 2024
b3b3aee
Fix dynamic access of schema validation variables 🔧
Kezzsim Feb 1, 2024
b869821
Instantiate `event_descriptors_revisions' collection ➕
Kezzsim Feb 1, 2024
413a44e
Validate before mutating name, pass tests ✅⏩️
Kezzsim Feb 2, 2024
78a256c
Replace an incredibly bad hardcoded test for stop ⏹️
Kezzsim Feb 2, 2024
bb44a88
Version is not incrementing in 'stop_revisions' collection ❌
Kezzsim Feb 2, 2024
0a507b5
Complete tests for `stop` docs, increment `stop` revisions 🔂
Kezzsim Feb 2, 2024
d297ddf
Always use UID, test for descriptor completed 🔬
Kezzsim Feb 2, 2024
90ff8e3
Black and flake 8 fight with eachother 💥
Kezzsim Feb 2, 2024
08d454b
Black vs Flake8, round 2 🥊
Kezzsim Feb 2, 2024
2bcf7b0
round 3 🤼
Kezzsim Feb 2, 2024
82298f5
Prevent alteration of foreign key fields 👮‍♀️
Kezzsim Feb 2, 2024
defb806
Better living through consistent documentation and python specific op…
Kezzsim Feb 5, 2024
b5101ed
Permit removing the `start_id` but not in validator (yet)
Kezzsim Feb 5, 2024
e3c9728
Rollback fixtures and clean up 🧼
Kezzsim Feb 5, 2024
b566dfa
Lint 🧼
Kezzsim Feb 5, 2024
34f19df
Actually, let's not edit `run_start`; tis a silly prop 🏰
Kezzsim Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 112 additions & 81 deletions suitcase/mongo_normalized/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
import pymongo
from ._version import get_versions

__version__ = get_versions()['version']
__version__ = get_versions()["version"]
del get_versions


class Serializer(event_model.DocumentRouter):
def __init__(self, metadatastore_db, asset_registry_db,
ignore_duplicates=True, resource_uid_unique=False,
tls=False):
def __init__(
self,
metadatastore_db,
asset_registry_db,
ignore_duplicates=True,
resource_uid_unique=False,
tls=False,
):
"""
Insert documents into MongoDB using layout v1.

Expand Down Expand Up @@ -48,22 +53,28 @@ def __init__(self, metadatastore_db, asset_registry_db,
assets_db = _get_database(asset_registry_db, tls)
else:
assets_db = asset_registry_db
self._run_start_collection = mds_db.get_collection('run_start')
self._run_start_collection_revisions = mds_db.get_collection('run_start_revisions')
self._run_stop_collection = mds_db.get_collection('run_stop')
self._event_descriptor_collection = mds_db.get_collection(
'event_descriptor')
self._event_collection = mds_db.get_collection('event')

self._resource_collection = assets_db.get_collection('resource')
self._datum_collection = assets_db.get_collection('datum')

self._collections = {'start': self._run_start_collection,
'stop': self._run_stop_collection,
'resource': self._resource_collection,
'descriptor': self._event_descriptor_collection,
'event': self._event_collection,
'datum': self._datum_collection}
self._run_start_collection = mds_db.get_collection("run_start")
self._run_start_collection_revisions = mds_db.get_collection(
"run_start_revisions"
)
self._run_stop_collection = mds_db.get_collection("run_stop")
self._run_stop_collection_revisions = mds_db.get_collection(
"run_stop_revisions"
)
self._event_descriptor_collection = mds_db.get_collection("event_descriptor")
Kezzsim marked this conversation as resolved.
Show resolved Hide resolved
self._event_collection = mds_db.get_collection("event")

self._resource_collection = assets_db.get_collection("resource")
self._datum_collection = assets_db.get_collection("datum")

self._collections = {
"start": self._run_start_collection,
"stop": self._run_stop_collection,
"resource": self._resource_collection,
"descriptor": self._event_descriptor_collection,
"event": self._event_collection,
"datum": self._datum_collection,
}

self._metadatastore_db = mds_db
self._asset_registry_db = assets_db
Expand All @@ -77,47 +88,54 @@ def _create_indexes(self):

If the index already exists, this has no effect.
"""
self._resource_collection.create_index(
'uid', unique=self._resource_uid_unique)
self._resource_collection.create_index('resource_id') # legacy
self._resource_collection.create_index("uid", unique=self._resource_uid_unique)
self._resource_collection.create_index("resource_id") # legacy
# TODO: Migrate all Resources to have a RunStart UID, and then make a
# unique index on:
# [('uid', pymongo.ASCENDING), ('run_start', pymongo.ASCENDING)]
self._datum_collection.create_index('datum_id', unique=True)
self._datum_collection.create_index('resource')
self._run_start_collection.create_index('uid', unique=True)
self._run_start_collection.create_index('scan_id', unique=False)
self._datum_collection.create_index("datum_id", unique=True)
self._datum_collection.create_index("resource")
self._run_start_collection.create_index("uid", unique=True)
self._run_start_collection.create_index("scan_id", unique=False)
self._run_start_collection.create_index(
[('scan_id', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("scan_id", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.ASCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("time", pymongo.ASCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("time", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.DESCENDING), ('scan_id', pymongo.DESCENDING)],
unique=False, background=True)
[("time", pymongo.DESCENDING), ("scan_id", pymongo.DESCENDING)],
unique=False,
background=True,
)
self._run_start_collection.create_index([("$**", "text")])
self._run_start_collection.create_index('data_session', unique=False)
self._run_start_collection.create_index('data_groups', unique=False)
self._run_stop_collection.create_index('run_start', unique=True)
self._run_stop_collection.create_index('uid', unique=True)
self._run_start_collection.create_index("data_session", unique=False)
self._run_start_collection.create_index("data_groups", unique=False)
self._run_stop_collection.create_index("run_start", unique=True)
self._run_stop_collection.create_index("uid", unique=True)
self._run_stop_collection.create_index(
[('time', pymongo.DESCENDING)], unique=False, background=True)
[("time", pymongo.DESCENDING)], unique=False, background=True
)
self._run_stop_collection.create_index([("$**", "text")])
self._event_descriptor_collection.create_index('uid', unique=True)
self._event_descriptor_collection.create_index("uid", unique=True)
self._event_descriptor_collection.create_index(
[('run_start', pymongo.DESCENDING), ('time', pymongo.DESCENDING)],
unique=False, background=True)
[("run_start", pymongo.DESCENDING), ("time", pymongo.DESCENDING)],
unique=False,
background=True,
)
self._event_descriptor_collection.create_index(
[('time', pymongo.DESCENDING)], unique=False, background=True)
[("time", pymongo.DESCENDING)], unique=False, background=True
)
self._event_descriptor_collection.create_index([("$**", "text")])
self._event_collection.create_index('uid', unique=True)
self._event_collection.create_index("uid", unique=True)
self._event_collection.create_index(
[('descriptor', pymongo.DESCENDING), ('time', pymongo.ASCENDING)],
unique=False, background=True)
[("descriptor", pymongo.DESCENDING), ("time", pymongo.ASCENDING)],
unique=False,
background=True,
)

def __call__(self, name, doc):
# Before inserting into mongo, convert any numpy objects into built-in
Expand All @@ -135,12 +153,14 @@ def _insert(self, name, doc):
f"already exists in the database. Document:\n{doc}"
) from err
else:
doc.pop('_id')
doc.pop("_id")
if name == "datum":
id_name = "datum_id"
else:
id_name = "uid"
existing = self._collections[name].find_one({id_name: doc[id_name]}, {'_id': False})
existing = self._collections[name].find_one(
{id_name: doc[id_name]}, {"_id": False}
)
if existing != doc:
raise DuplicateUniqueID(
"A document with the same unique id as this one "
Expand All @@ -159,39 +179,46 @@ def update(self, name, doc):
Parameters
----------

name: {'start'}
The type of document being updated. Currently, only 'start' is
supported and any other value here will raise NotImplementedError.
name: {'start'} OR {'stop'} OR {'descriptor'}
Kezzsim marked this conversation as resolved.
Show resolved Hide resolved
The type of document being updated.
doc: dict
The new version of the document. Its uid will be used to match it
to the current version, the one to be updated.
"""
if name == 'start':
if name in ["start", "stop", "descriptor"]:
Kezzsim marked this conversation as resolved.
Show resolved Hide resolved
# Keys and collection names differ slightly between start, stop and descriptor
key = "uid" if name == "start" else "run_start"
name = f"_event_{name}" if name == "descriptor" else f"_run_{name}"
event_model.schema_validators[event_model.DocumentNames.start].validate(doc)
Kezzsim marked this conversation as resolved.
Show resolved Hide resolved
current_col = self._run_start_collection
revisions_col = self._run_start_collection_revisions
old = current_col.find_one({'uid': doc['uid']})
old.pop('_id')
target_uid_docs = revisions_col.find({'document.uid': doc['uid']})
cur = target_uid_docs.sort([('revision', pymongo.DESCENDING)]).limit(1)
wrapped = dict()
try:
wrapped['revision'] = next(cur)['revision'] + 1
except StopIteration:
wrapped['revision'] = 0
wrapped['document'] = old
revisions_col.insert_one(wrapped)
current_col.find_one_and_replace({'uid': doc['uid']}, doc)
current_col = getattr(self, f"{name}_collection")
revisions_col = getattr(self, f"{name}_collection_revisions")
old = current_col.find_one({key: doc[key]})
if old is None and (name == "_run_stop" or name == "_event_descriptor"):
# New stop or descriptor document : insert it
current_col.insert_one(doc)
else:
old.pop("_id")
target_uid_docs = revisions_col.find({"document.uid": doc[key]})
cur = target_uid_docs.sort([("revision", pymongo.DESCENDING)]).limit(1)
wrapped = dict()
try:
wrapped["revision"] = next(cur)["revision"] + 1
except StopIteration:
wrapped["revision"] = 0
wrapped["document"] = old
revisions_col.insert_one(wrapped)
current_col.find_one_and_replace({key: doc[key]}, doc)
else:
raise NotImplementedError(
f"Updating a {name} document is not currently supported. "
f"Only updates to 'start' documents are supported.")
f"Only updates to 'start' documents are supported."
Kezzsim marked this conversation as resolved.
Show resolved Hide resolved
)

def start(self, doc):
self._insert('start', doc)
self._insert("start", doc)

def descriptor(self, doc):
self._insert('descriptor', doc)
self._insert("descriptor", doc)

def resource(self, doc):
# In old databases, we know there are duplicates Resources. Until we
Expand All @@ -202,9 +229,11 @@ def resource(self, doc):
# is slow, but since there are never a large number of Resources per
# Run, this is acceptable.
if self._resource_uid_unique:
self._insert('resource', doc)
self._insert("resource", doc)
else:
existing = self._collections["resource"].find_one({'uid': doc['uid']}, {'_id': False})
existing = self._collections["resource"].find_one(
{"uid": doc["uid"]}, {"_id": False}
)
if existing is not None:
if existing != doc:
raise DuplicateUniqueID(
Expand All @@ -216,7 +245,7 @@ def resource(self, doc):
self._collections["resource"].insert_one(doc)

def event(self, doc):
self._insert('event', doc)
self._insert("event", doc)

def event_page(self, doc):
# Unpack an EventPage into Events and do the actual insert inside
Expand All @@ -230,7 +259,7 @@ def event_page(self, doc):
filled_events.append(event_method(event_doc))

def datum(self, doc):
self._insert('datum', doc)
self._insert("datum", doc)

def datum_page(self, doc):
# Unpack an DatumPage into Datum and do the actual insert inside
Expand All @@ -244,20 +273,22 @@ def datum_page(self, doc):
filled_datums.append(datum_method(datum_doc))

def stop(self, doc):
self._insert('stop', doc)
self._insert("stop", doc)

def __repr__(self):
# Display connection info in eval-able repr.
return (f'{type(self).__name__}('
f'metadatastore_db={self._metadatastore_db!r}, '
f'asset_registry_db={self._asset_registry_db!r})')
return (
f"{type(self).__name__}("
f"metadatastore_db={self._metadatastore_db!r}, "
f"asset_registry_db={self._asset_registry_db!r})"
)


def _get_database(uri, tls):
if not pymongo.uri_parser.parse_uri(uri)['database']:
if not pymongo.uri_parser.parse_uri(uri)["database"]:
raise ValueError(
f"Invalid URI: {uri} "
f"Did you forget to include a database?")
f"Invalid URI: {uri} " f"Did you forget to include a database?"
)
else:
client = pymongo.MongoClient(uri, tls=tls)
return client.get_database()
Expand Down
Loading