From 1d9aad71a10f71ba6421ec56416bf8e01f237d11 Mon Sep 17 00:00:00 2001 From: Padraic Shafer Date: Sat, 23 Nov 2024 08:14:38 -0800 Subject: [PATCH 1/6] Bulk write Events from EventPage --- suitcase/mongo_normalized/__init__.py | 34 ++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 6a2b478..813ed84 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -171,6 +171,27 @@ def _insert(self, name, doc): f"Existing document:\n{existing}\nNew document:\n{doc}" ) from err + def _insert_many(self, name, doc): + try: + result = self._collections[name].insert_many(doc) + except pymongo.errors.DuplicateKeyError as err: + print(str(err)) + raise err + except pymongo.errors.ClientBulkWriteException as err: + print(str(err)) + raise err + except pymongo.errors.BulkwWriteError as err: + print(str(err)) + raise err + except pymongo.errors.WriteError as err: + print(str(err)) + raise err + except pymongo.errors.PyMongoError as err: + print(str(err)) + raise err + + assert len(doc) == len(result.inserted_ids) + def update(self, name, doc): """ Update documents. Currently only 'start' documents are supported. @@ -259,15 +280,12 @@ def event(self, doc): self._insert("event", doc) def event_page(self, doc): - # Unpack an EventPage into Events and do the actual insert inside - # the `event` method. (This is the oppose what DocumentRouter does by - # default.) - - event_method = self.event # Avoid attribute lookup in hot loop. - filled_events = [] + """Unpack an EventPage into Events and do a bulk insert""" - for event_doc in event_model.unpack_event_page(doc): - filled_events.append(event_method(event_doc)) + self._insert_many( + "event", + tuple(event_doc for event_doc in event_model.unpack_event_page(doc)) + ) def datum(self, doc): self._insert("datum", doc) From b508304fd921a6e9528c70e6c146acefeb4fc8f0 Mon Sep 17 00:00:00 2001 From: Padraic Shafer Date: Fri, 10 Jan 2025 16:50:40 -0800 Subject: [PATCH 2/6] Inspect BulkWriteError from insert_many() --- suitcase/mongo_normalized/__init__.py | 41 +++++++++++++++++---------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 813ed84..9382d54 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -174,23 +174,34 @@ def _insert(self, name, doc): def _insert_many(self, name, doc): try: result = self._collections[name].insert_many(doc) - except pymongo.errors.DuplicateKeyError as err: - print(str(err)) - raise err - except pymongo.errors.ClientBulkWriteException as err: - print(str(err)) - raise err - except pymongo.errors.BulkwWriteError as err: - print(str(err)) - raise err - except pymongo.errors.WriteError as err: - print(str(err)) - raise err - except pymongo.errors.PyMongoError as err: + assert len(doc) == len(result.inserted_ids) + # Force a BulkWriteError due to duplicate key(s) + result = self._collections[name].insert_many(doc) + except pymongo.errors.BulkWriteError as err: print(str(err)) + err_info = err.details + success_count = err_info["nInserted"] + reported_errors = err_info["writeErrors"] + DUPLICATE_KEY_ERROR = 11000 + # OK to use a generator for now + reported_duplicate_docs = ( + report["op"] for report in reported_errors + if report["code"] == DUPLICATE_KEY_ERROR + ) + reported_duplicate_indexes = ( + report["index"] for report in reported_errors + if report["code"] == DUPLICATE_KEY_ERROR + ) + print(f"{success_count = }") + print(f"{sorted(reported_duplicate_indexes, reverse=True) = }") + if not self._ignore_duplicates: + error_message = "\n...\n".join(( + "A document with the same unique id as this one " + f"already exists in the database. Document:\n{doc}" + for doc in reported_duplicate_docs + )) + raise DuplicateUniqueID(error_message) from err raise err - - assert len(doc) == len(result.inserted_ids) def update(self, name, doc): """ From c6484c2070d5f0b48fdd39195f212d623dc6ca26 Mon Sep 17 00:00:00 2001 From: Padraic Shafer Date: Fri, 10 Jan 2025 19:47:03 -0800 Subject: [PATCH 3/6] Retry insert_many() when BulkWriteError --- suitcase/mongo_normalized/__init__.py | 72 +++++++++++++++++---------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 9382d54..eff1b3a 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -171,37 +171,57 @@ def _insert(self, name, doc): f"Existing document:\n{existing}\nNew document:\n{doc}" ) from err - def _insert_many(self, name, doc): + def _insert_many(self, name, docs, ignore_duplicates=True): + requested_count = len(docs) + print(f"{requested_count = }") + try: - result = self._collections[name].insert_many(doc) - assert len(doc) == len(result.inserted_ids) - # Force a BulkWriteError due to duplicate key(s) - result = self._collections[name].insert_many(doc) + result = self._collections[name].insert_many(docs) + inserted_count = len(result.inserted_ids) + assert inserted_count == requested_count + + return {"inserted_count": inserted_count, "duplicate_ids": set()} + except pymongo.errors.BulkWriteError as err: - print(str(err)) err_info = err.details - success_count = err_info["nInserted"] + inserted_count = err_info["nInserted"] reported_errors = err_info["writeErrors"] + # TODO: Check for writeConcernErrors? DUPLICATE_KEY_ERROR = 11000 - # OK to use a generator for now reported_duplicate_docs = ( report["op"] for report in reported_errors if report["code"] == DUPLICATE_KEY_ERROR ) - reported_duplicate_indexes = ( - report["index"] for report in reported_errors - if report["code"] == DUPLICATE_KEY_ERROR + duplicate_ids = set(doc["_id"] for doc in reported_duplicate_docs) + remaining_docs = tuple( + doc for doc in docs + if doc["_id"] not in duplicate_ids ) - print(f"{success_count = }") - print(f"{sorted(reported_duplicate_indexes, reverse=True) = }") - if not self._ignore_duplicates: - error_message = "\n...\n".join(( - "A document with the same unique id as this one " - f"already exists in the database. Document:\n{doc}" - for doc in reported_duplicate_docs - )) - raise DuplicateUniqueID(error_message) from err - raise err + if not remaining_docs: + return { + "inserted_count": inserted_count, + "duplicate_ids": duplicate_ids, + } + result = self._insert_many(name, remaining_docs) + inserted_count += result["inserted_count"] + duplicate_ids.update(result["duplicate_ids"]) + + assert inserted_count + len(duplicate_ids) == requested_count + + if duplicate_ids and not ignore_duplicates: + error_message = "\n".join(( + "A document with the same unique id as this one " + f"already exists in the database. Document:\n{doc}" + for doc in docs + if doc["_id"] in sorted(duplicate_ids) + )) + raise DuplicateUniqueID(error_message) + # TODO: Check for duplicated uid/datum_id with distinct contents + + return { + "inserted_count": inserted_count, + "duplicate_ids": duplicate_ids, + } def update(self, name, doc): """ @@ -292,11 +312,13 @@ def event(self, doc): def event_page(self, doc): """Unpack an EventPage into Events and do a bulk insert""" - - self._insert_many( - "event", - tuple(event_doc for event_doc in event_model.unpack_event_page(doc)) + event_docs = tuple( + event_doc for event_doc in event_model.unpack_event_page(doc) ) + self._ignore_duplicates = False + self._insert_many("event", event_docs, self._ignore_duplicates) + # Force a BulkWriteError due to duplicate key(s) + self._insert_many("event", event_docs, self._ignore_duplicates) def datum(self, doc): self._insert("datum", doc) From ee26e458cd0bad8a5634670271ff8a9793f5a29b Mon Sep 17 00:00:00 2001 From: Padraic Shafer Date: Fri, 10 Jan 2025 21:57:00 -0800 Subject: [PATCH 4/6] Simple insert_many() duplicate handling --- suitcase/mongo_normalized/__init__.py | 77 +++++++++------------------ 1 file changed, 26 insertions(+), 51 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index eff1b3a..17725ed 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -171,57 +171,29 @@ def _insert(self, name, doc): f"Existing document:\n{existing}\nNew document:\n{doc}" ) from err - def _insert_many(self, name, docs, ignore_duplicates=True): - requested_count = len(docs) - print(f"{requested_count = }") - + def _insert_many(self, name, docs): + """Bulk write the documents for fewer network requests.""" try: - result = self._collections[name].insert_many(docs) - inserted_count = len(result.inserted_ids) - assert inserted_count == requested_count - - return {"inserted_count": inserted_count, "duplicate_ids": set()} - + self._collections[name].insert_many(docs) except pymongo.errors.BulkWriteError as err: - err_info = err.details - inserted_count = err_info["nInserted"] - reported_errors = err_info["writeErrors"] - # TODO: Check for writeConcernErrors? DUPLICATE_KEY_ERROR = 11000 - reported_duplicate_docs = ( - report["op"] for report in reported_errors - if report["code"] == DUPLICATE_KEY_ERROR + duplicate_docs = ( + error["op"] for error in err.details["writeErrors"] + if error["code"] == DUPLICATE_KEY_ERROR ) - duplicate_ids = set(doc["_id"] for doc in reported_duplicate_docs) - remaining_docs = tuple( - doc for doc in docs - if doc["_id"] not in duplicate_ids + duplicate_ids = set( + doc["_id"] for doc in duplicate_docs ) - if not remaining_docs: - return { - "inserted_count": inserted_count, - "duplicate_ids": duplicate_ids, - } - result = self._insert_many(name, remaining_docs) - inserted_count += result["inserted_count"] - duplicate_ids.update(result["duplicate_ids"]) - - assert inserted_count + len(duplicate_ids) == requested_count - - if duplicate_ids and not ignore_duplicates: - error_message = "\n".join(( - "A document with the same unique id as this one " - f"already exists in the database. Document:\n{doc}" - for doc in docs - if doc["_id"] in sorted(duplicate_ids) - )) - raise DuplicateUniqueID(error_message) - # TODO: Check for duplicated uid/datum_id with distinct contents - - return { - "inserted_count": inserted_count, - "duplicate_ids": duplicate_ids, - } + if duplicate_ids: + error_message = "\n".join(( + "A document with the same unique id as this one " + f"already exists in the database. Document:\n{doc}" + for doc in docs + if doc["_id"] in sorted(duplicate_ids) + )) + raise DuplicateUniqueID(error_message) from err + else: + raise err def update(self, name, doc): """ @@ -311,14 +283,17 @@ def event(self, doc): self._insert("event", doc) def event_page(self, doc): - """Unpack an EventPage into Events and do a bulk insert""" + """Unpack an EventPage into Events and do a bulk insert.""" event_docs = tuple( event_doc for event_doc in event_model.unpack_event_page(doc) ) - self._ignore_duplicates = False - self._insert_many("event", event_docs, self._ignore_duplicates) - # Force a BulkWriteError due to duplicate key(s) - self._insert_many("event", event_docs, self._ignore_duplicates) + try: + self._insert_many("event", event_docs) + except DuplicateUniqueID: + # Bulk writing events failed; retry each event + event_method = self.event # Avoid attribute lookup in hot loop. + for event_doc in event_docs: + event_method(event_doc) def datum(self, doc): self._insert("datum", doc) From 953483d69221b283c6d1b8c03173b3a643c2c9d3 Mon Sep 17 00:00:00 2001 From: Padraic Shafer Date: Sat, 11 Jan 2025 06:39:34 -0800 Subject: [PATCH 5/6] Use unique id for insert_many() collision checks --- suitcase/mongo_normalized/__init__.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 17725ed..856b8b2 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -1,3 +1,5 @@ +import functools + import event_model import pymongo from ._version import get_versions @@ -156,10 +158,7 @@ def _insert(self, name, doc): ) from err else: doc.pop("_id") - if name == "datum": - id_name = "datum_id" - else: - id_name = "uid" + id_name = unique_id_name(name) existing = self._collections[name].find_one( {id_name: doc[id_name]}, {"_id": False} ) @@ -181,18 +180,20 @@ def _insert_many(self, name, docs): error["op"] for error in err.details["writeErrors"] if error["code"] == DUPLICATE_KEY_ERROR ) + id_name = unique_id_name(name) duplicate_ids = set( - doc["_id"] for doc in duplicate_docs + doc[id_name] for doc in duplicate_docs ) if duplicate_ids: error_message = "\n".join(( "A document with the same unique id as this one " f"already exists in the database. Document:\n{doc}" for doc in docs - if doc["_id"] in sorted(duplicate_ids) + if doc[id_name] in sorted(duplicate_ids) )) raise DuplicateUniqueID(error_message) from err else: + # For example, a "write concern" error raise err def update(self, name, doc): @@ -290,7 +291,7 @@ def event_page(self, doc): try: self._insert_many("event", event_docs) except DuplicateUniqueID: - # Bulk writing events failed; retry each event + # Bulk writing the events failed; retry each event event_method = self.event # Avoid attribute lookup in hot loop. for event_doc in event_docs: event_method(event_doc) @@ -321,6 +322,17 @@ def __repr__(self): ) +@functools.lru_cache(maxsize=16) +def unique_id_name(name: str) -> str: + """Return the name of the unique id field for a named document type.""" + if name == "datum": + id_name = "datum_id" + else: + id_name = "uid" + + return id_name + + def _get_database(uri, tls): if not pymongo.uri_parser.parse_uri(uri)["database"]: raise ValueError( From fe61e2db661e49f87c37c4982d6fb51469b69035 Mon Sep 17 00:00:00 2001 From: Padraic Shafer Date: Sat, 11 Jan 2025 07:13:37 -0800 Subject: [PATCH 6/6] DatumPage uses insert_many() --- suitcase/mongo_normalized/__init__.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 856b8b2..f84f6d2 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -291,24 +291,26 @@ def event_page(self, doc): try: self._insert_many("event", event_docs) except DuplicateUniqueID: - # Bulk writing the events failed; retry each event + # Bulk writing the Events failed; retry inserting each Event event_method = self.event # Avoid attribute lookup in hot loop. for event_doc in event_docs: - event_method(event_doc) + event_method(event_doc) def datum(self, doc): self._insert("datum", doc) def datum_page(self, doc): - # Unpack an DatumPage into Datum and do the actual insert inside - # the `datum` method. (This is the oppose what DocumentRouter does by - # default.) - - datum_method = self.datum # Avoid attribute lookup in hot loop. - filled_datums = [] - - for datum_doc in event_model.unpack_datum_page(doc): - filled_datums.append(datum_method(datum_doc)) + """Unpack a DatumPage into 'Datum's and do a bulk insert.""" + datum_docs = tuple( + datum_doc for datum_doc in event_model.unpack_datum_page(doc) + ) + try: + self._insert_many("datum", datum_docs) + except DuplicateUniqueID: + # Bulk writing the 'Datum's failed; retry inserting each Datum + datum_method = self.datum # Avoid attribute lookup in hot loop. + for datum_doc in datum_docs: + datum_method(datum_doc) def stop(self, doc): self._insert("stop", doc)