-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Elastic Bulk Document Creation #2772
Changes from 114 commits
f8a4d54
29fd04c
699245c
5310946
0269728
ac120a6
a7c1207
d460fe2
2bb6622
c90ce75
9dfd6f8
90aea2f
8f210cd
a093aa0
8ea36c6
85da10b
ac95c41
d75ba04
ea5aa30
b4d0549
8ce83ce
c75bb7e
0ec8f16
359dae0
8ed0b4a
c57a2d5
812a2dd
96634b2
0be491f
02ce7d0
aa8a534
19926ea
6b7c86e
94ef180
b6eee03
49dc398
b0fd38a
700a965
8aae444
9636854
5c2e819
763c09b
3e73dbc
e1c9c9a
dcc1dca
741b442
28c9636
091e716
3e19f2d
c2501f8
b04df52
206696a
ad17b56
efc15bd
86e3024
25eb85e
d2d443d
573a056
b1be7bb
cad9173
aa6a5ad
4bce93d
d29ed8e
bd29c09
657379d
eb59146
ae41efd
100d647
beda306
13cd218
1536749
bde9160
0316025
acfb697
522ca38
6dd34a0
1809129
640db6e
96e48d0
556221b
32a4671
7fd7b4d
6ffbee8
ccee755
86aae24
01bfd0b
1497d4a
2febdcd
97a0cf6
2b9b46f
c18383f
7fc2a09
4bf8957
761e4eb
1aeb315
a284856
1f9f32d
eacffe9
71615d2
bd6edd1
65b32ce
c49365f
7bb7522
af2d342
d88a329
247347c
8205b43
e32cb0d
26455b4
702c547
6c97446
ffb1993
8026eae
6ae551f
f1b8a8b
8d24f72
715ced8
3fdb9db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
elasticsearch.hosts: ["http://elastic:9200"] | ||
server.host: kibana |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,22 +75,29 @@ def parse_datafile(datafile): | |
|
||
return errors | ||
|
||
|
||
def bulk_create_records(unsaved_records, line_number, header_count, batch_size=10000, flush=False): | ||
"""Bulk create passed in records.""" | ||
if (line_number % batch_size == 0 and header_count > 0) or flush: | ||
logger.debug("Bulk creating records.") | ||
try: | ||
num_created = 0 | ||
num_expected = 0 | ||
for model, records in unsaved_records.items(): | ||
num_expected += len(records) | ||
num_created += len(model.objects.bulk_create(records)) | ||
if num_created != num_expected: | ||
logger.error(f"Bulk create only created {num_created}/{num_expected}!") | ||
num_db_records_created = 0 | ||
num_expected_db_records = 0 | ||
num_elastic_records_created = 0 | ||
for document, records in unsaved_records.items(): | ||
num_expected_db_records += len(records) | ||
created_objs = document.Django.model.objects.bulk_create(records) | ||
num_elastic_records_created += document.update(created_objs)[0] | ||
num_db_records_created += len(created_objs) | ||
if num_db_records_created != num_expected_db_records: | ||
logger.error(f"Bulk Django record creation only created {num_db_records_created}/" + | ||
f"{num_expected_db_records}!") | ||
elif num_elastic_records_created != num_expected_db_records: | ||
logger.error(f"Bulk Elastic document creation only created {num_elastic_records_created}/" + | ||
f"{num_expected_db_records}!") | ||
else: | ||
logger.info(f"Created {num_created}/{num_expected} records.") | ||
return num_created == num_expected, {} | ||
logger.info(f"Created {num_db_records_created}/{num_expected_db_records} records.") | ||
return num_db_records_created == num_expected_db_records and \ | ||
num_elastic_records_created == num_expected_db_records, {} | ||
except DatabaseError as e: | ||
logger.error(f"Encountered error while creating datafile records: {e}") | ||
return False, unsaved_records | ||
|
@@ -127,7 +134,8 @@ def evaluate_trailer(datafile, trailer_count, multiple_trailer_errors, is_last_l | |
def rollback_records(unsaved_records, datafile): | ||
"""Delete created records in the event of a failure.""" | ||
logger.info("Rolling back created records.") | ||
for model in unsaved_records: | ||
for document in unsaved_records: | ||
model = document.Django.model | ||
num_deleted, models = model.objects.filter(datafile=datafile).delete() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. out of curiosity i ran the big Screen.Recording.2023-12-20.at.10.39.15.AM.movbut the db and elastic eventually both end up in a consistent state. this may be a topic for another ticket, but implementing some sort of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, our transaction-less rollback is a pain point for sure. It would be interesting to test out the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have to be careful with |
||
logger.debug(f"Deleted {num_deleted} records of type: {model}.") | ||
|
||
|
@@ -218,7 +226,7 @@ def parse_datafile_lines(datafile, program_type, section, is_encrypted): | |
if record: | ||
s = schema_manager.schemas[i] | ||
record.datafile = datafile | ||
unsaved_records.setdefault(s.model, []).append(record) | ||
unsaved_records.setdefault(s.document, []).append(record) | ||
|
||
all_created, unsaved_records = bulk_create_records(unsaved_records, line_number, header_count,) | ||
unsaved_parser_errors, num_errors = bulk_create_errors(unsaved_parser_errors, num_errors) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,13 +12,13 @@ class RowSchema: | |
|
||
def __init__( | ||
self, | ||
model=dict, | ||
document, | ||
preparsing_validators=[], | ||
postparsing_validators=[], | ||
fields=[], | ||
quiet_preparser_errors=False | ||
quiet_preparser_errors=False, | ||
): | ||
self.model = model | ||
self.document = document | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this change makes me nervous because of the potential side-effects, but you've covered all the cases i've thought of so far |
||
self.preparsing_validators = preparsing_validators | ||
self.postparsing_validators = postparsing_validators | ||
self.fields = fields | ||
|
@@ -90,7 +90,7 @@ def run_preparsing_validators(self, line, generate_error): | |
|
||
def parse_line(self, line): | ||
"""Create a model for the line based on the schema.""" | ||
record = self.model() | ||
record = self.document.Django.model() if self.document is not None else dict() | ||
|
||
for field in self.fields: | ||
value = field.parse_value(line) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ | |
|
||
|
||
header = RowSchema( | ||
model=dict, | ||
document=None, | ||
preparsing_validators=[ | ||
validators.hasLength( | ||
23, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion is to add line_number/header count (if it is meaningful) for understanding the error and being able to debug from the log