diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 6a2b478..f84f6d2 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -1,3 +1,5 @@ +import functools + import event_model import pymongo from ._version import get_versions @@ -156,10 +158,7 @@ def _insert(self, name, doc): ) from err else: doc.pop("_id") - if name == "datum": - id_name = "datum_id" - else: - id_name = "uid" + id_name = unique_id_name(name) existing = self._collections[name].find_one( {id_name: doc[id_name]}, {"_id": False} ) @@ -171,6 +170,32 @@ def _insert(self, name, doc): f"Existing document:\n{existing}\nNew document:\n{doc}" ) from err + def _insert_many(self, name, docs): + """Bulk write the documents for fewer network requests.""" + try: + self._collections[name].insert_many(docs) + except pymongo.errors.BulkWriteError as err: + DUPLICATE_KEY_ERROR = 11000 + duplicate_docs = ( + error["op"] for error in err.details["writeErrors"] + if error["code"] == DUPLICATE_KEY_ERROR + ) + id_name = unique_id_name(name) + duplicate_ids = set( + doc[id_name] for doc in duplicate_docs + ) + if duplicate_ids: + error_message = "\n".join(( + "A document with the same unique id as this one " + f"already exists in the database. Document:\n{doc}" + for doc in docs + if doc[id_name] in sorted(duplicate_ids) + )) + raise DuplicateUniqueID(error_message) from err + else: + # For example, a "write concern" error + raise err + def update(self, name, doc): """ Update documents. Currently only 'start' documents are supported. @@ -259,29 +284,33 @@ def event(self, doc): self._insert("event", doc) def event_page(self, doc): - # Unpack an EventPage into Events and do the actual insert inside - # the `event` method. (This is the oppose what DocumentRouter does by - # default.) - - event_method = self.event # Avoid attribute lookup in hot loop. - filled_events = [] - - for event_doc in event_model.unpack_event_page(doc): - filled_events.append(event_method(event_doc)) + """Unpack an EventPage into Events and do a bulk insert.""" + event_docs = tuple( + event_doc for event_doc in event_model.unpack_event_page(doc) + ) + try: + self._insert_many("event", event_docs) + except DuplicateUniqueID: + # Bulk writing the Events failed; retry inserting each Event + event_method = self.event # Avoid attribute lookup in hot loop. + for event_doc in event_docs: + event_method(event_doc) def datum(self, doc): self._insert("datum", doc) def datum_page(self, doc): - # Unpack an DatumPage into Datum and do the actual insert inside - # the `datum` method. (This is the oppose what DocumentRouter does by - # default.) - - datum_method = self.datum # Avoid attribute lookup in hot loop. - filled_datums = [] - - for datum_doc in event_model.unpack_datum_page(doc): - filled_datums.append(datum_method(datum_doc)) + """Unpack a DatumPage into 'Datum's and do a bulk insert.""" + datum_docs = tuple( + datum_doc for datum_doc in event_model.unpack_datum_page(doc) + ) + try: + self._insert_many("datum", datum_docs) + except DuplicateUniqueID: + # Bulk writing the 'Datum's failed; retry inserting each Datum + datum_method = self.datum # Avoid attribute lookup in hot loop. + for datum_doc in datum_docs: + datum_method(datum_doc) def stop(self, doc): self._insert("stop", doc) @@ -295,6 +324,17 @@ def __repr__(self): ) +@functools.lru_cache(maxsize=16) +def unique_id_name(name: str) -> str: + """Return the name of the unique id field for a named document type.""" + if name == "datum": + id_name = "datum_id" + else: + id_name = "uid" + + return id_name + + def _get_database(uri, tls): if not pymongo.uri_parser.parse_uri(uri)["database"]: raise ValueError(