Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option handle row metadata #38

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ celerybeat-schedule
# dotenv
.env

# direnv
.envrc

# virtualenv
.venv
venv/
Expand All @@ -100,4 +103,4 @@ ENV/
# mypy
.mypy_cache/

.vscode/
.vscode/
3 changes: 2 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"project_id": "bigquery-public-data",
"dataset_id": "samples",
"table_id": "github_timeline",
"validate_records": true
"validate_records": true,
"add_tap_metadata": false
}
65 changes: 56 additions & 9 deletions target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,54 @@ def define_schema(field, name):

return (schema_name, schema_type, schema_mode, schema_description, schema_fields)

def build_schema(schema):
def build_tap_metadata_schema():
metadata_fields = {
"properties": {
"_sdc_version": {
"inclusion": "available",
"minimum": -2147483648,
"maximum": -2147483647,
"type": [
"null",
"integer",
]
},
"_sdc_extracted_at": {
"inclusion": "available",
"format": "date-time",
"type": [
"null",
"string",
],
},
"_sdc_deleted_at": {
"inclusion": "available",
"format": "date-time",
"type": [
"null",
"string",
],
},
}
}
return build_schema(metadata_fields, add_metadata_fields=False)

def build_schema(schema, add_metadata_fields=False):
SCHEMA = []
for key in schema['properties'].keys():

if not (bool(schema['properties'][key])):
# if we endup with an empty record.
continue

schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key)
SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields))

if add_metadata_fields:
SCHEMA += build_tap_metadata_schema()
return SCHEMA

def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True):
def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True, add_tap_metadata=False):
state = None
schemas = {}
key_properties = {}
Expand Down Expand Up @@ -138,6 +172,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida
if validate_records:
validate(msg.record, schema)

if add_tap_metadata:
msg.record['_sdc_version'] = msg.version
msg.record['_sdc_extracted_at'] = str(msg.time_extracted)

# NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row.
dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8')

Expand All @@ -154,7 +192,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida
table = msg.stream
schemas[table] = msg.schema
key_properties[table] = msg.key_properties
#tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table]))
#tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table], add_tap_metadata))
rows[table] = TemporaryFile(mode='w+b')
errors[table] = None
# try:
Expand All @@ -171,7 +209,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida

for table in rows.keys():
table_ref = bigquery_client.dataset(dataset_id).table(table)
SCHEMA = build_schema(schemas[table])
SCHEMA = build_schema(schemas[table], add_tap_metadata)
load_config = LoadJobConfig()
load_config.schema = SCHEMA
load_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON
Expand All @@ -195,7 +233,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida

return state

def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True):
def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True, add_tap_metadata=False):
state = None
schemas = {}
key_properties = {}
Expand Down Expand Up @@ -228,6 +266,10 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr
if validate_records:
validate(msg.record, schema)

if add_tap_metadata:
msg.record['_sdc_version'] = msg.version
msg.record['_sdc_time_extracted'] = str(msg.time_extracted)

errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record])
rows[msg.stream] += 1

Expand All @@ -241,7 +283,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr
table = msg.stream
schemas[table] = msg.schema
key_properties[table] = msg.key_properties
tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table]))
tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table], add_tap_metadata))
rows[table] = 0
errors[table] = None
try:
Expand Down Expand Up @@ -298,14 +340,19 @@ def main():
else:
truncate = False

if config.get('add_tap_metadata'):
add_tap_metadata = True
else:
add_tap_metadata = False

validate_records = config.get('validate_records', True)

input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')

if config.get('stream_data', True):
state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records)
state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records, add_tap_metadata=add_tap_metadata)
else:
state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records)
state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records, add_tap_metadata=add_tap_metadata)

emit_state(state)
logger.debug("Exiting normally")
Expand Down