Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bulk write operation to insert Event/Datum Page into database #55

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

padraic-shafer
Copy link

Description

This PR uses pymongo's insert_many() function to simultaneously send all Events from an Event Page to the mongo database, rather than the previous behavior of calling insert_one() for each Event inside a for-loop. The same change was applied to inserting a Datum Page.

This should reduce the time needed to store data when a Bluesky run has many thousands of events. The effect should be even more pronounced in deployments where network latency is high.

Motivation and Context

Historically an Event Page is unpacked into individual Events and each event is inserted individually into the database. This is fine for small numbers of events, but the communication overhead scales with the number of events. Each database INSERT operation adds cumulative network latency.

A continuous data acquisition "fly scan" may need only a few minutes or seconds to scan motors, but then consumes an additional several minutes to store the tens (or hundreds) of thousands of data events. Using a bulk write operation to insert many Events with a single network call (either one call or a small number of calls per Event Page, depending on the size of the communication buffer) should minimize the network-based contribution to overall storage time.

The same argument applies to the Datum Page / Datum relationship.

How Has This Been Tested?

Performance: Timings were recorded while inserting the data from fly scans--using mock hardware--to create Event Pages with a varying number of Events (from 1 to 1e6). Details will be provided in the comments below.

Unit Tests: All unit tests for the package were successful, running in local environment.

@padraic-shafer
Copy link
Author

Some notes on the approach in this PR...

  • The handler for an Event Page or Datum Page first tries to insert all documents in the page using insert_many()
  • All documents in the page must be unpacked before calling insert_many() to send them to the database. This may cause more working memory to be used than the historical approach that uses a generator to unpack each document before it is sent.
  • If one or more duplicate (Event or Datum) documents are encountered during the Bulk Write operation, then the operation is aborted, and each document is retried individually.
  • The retry "fallback" step uses the same insert_one() procedure that has been used historically by suitcase-mongo. This is a safe fallback approach.

In principle, when Bulk Write operation fails, one could track which INSERTs succeeded and which did not. However, in practice, the operation was observed to abort upon the first duplicate found...so many retries are needed when multiple duplicates are encountered. For an Event Page being processed over a message bus, either the entire Page has probably already been processed or it is not yet in the database. Accordingly, when one Event is found to be a duplicate, all Events in the Event Page will likely be duplicates. Retrying the Bulk Write operation in these circumstances requires the same number of calls as inserting one Event at a time; even worse, it requires additional logic and larger messages for each call. Therefore the fallback approach of using the original procedure for inserting one Event at-a-time was chosen.

@padraic-shafer
Copy link
Author

image

This plot shows the time spent processing fly scans with various numbers of Events in each scan. The blue bars show the total time per scan. The orange bars show the portion that consumed CPU time. The difference is presumably dominated by I/O operations.

"Processing" includes running the scans that generate the data and handling all run documents, both by the Run Bundler and by the mongo Serializer.

For each scan size, data is presented in pairs -- on the left are the results from the bulk insertions (this PR); on the right are the results from the historical approach (insert each Event within a loop iteration).

@padraic-shafer
Copy link
Author

The CPU time is essentially the same for bulk insert as for loop insert. This seems reasonable because the same Events are being processed; the same amount of work is being done.

The discrepancy in total time grows noticeably larger as the number of events increases. This discrepancy can be attributed almost entirely to the extra insertion calls made by the for-loop. For 1_000_000 Events per scan, the difference in median total time is ~100 seconds! The difference in worst-case total time is ~200 seconds.

image

Although the savings of using the bulk insert operation works out to ~100 microseconds per Event, it should be noted that these tests were run in conditions close to ideal for low network lag. Both the RunEngine and the mongo database were running locally on the same host, so the additional delays from the insertion loop represent a "Best Case Scenario".

In a "Real World Scenario" a 5-ms network latency per database request would add 50 seconds for a 10k-Event scan -- raising the total time to 54 seconds...up from just 4 seconds observed in this test!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants