Skip to content

Commit

Permalink
fix: Reconcile gremlin description writes with the Databuilder (amund…
Browse files Browse the repository at this point in the history
…sen-io#290)

* Fix up Neptune description writes

Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>

* fix typing

Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
  • Loading branch information
AndrewCiambrone authored Apr 12, 2021
1 parent c7b0e7a commit 18454fe
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
1 change: 1 addition & 0 deletions metadata_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class NeptuneConfig(LocalGremlinConfig, LocalConfig):
LOG_LEVEL = 'INFO'

# PROXY_HOST FORMAT: wss://<NEPTUNE_URL>:<NEPTUNE_PORT>/gremlin
PROXY_HOST = os.environ.get('PROXY_HOST', 'localhost')
PROXY_PORT = None # type: ignore

PROXY_CLIENT = PROXY_CLIENTS['NEPTUNE']
Expand Down
18 changes: 11 additions & 7 deletions metadata_service/proxy/gremlin_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ def get_table_description(self, *, table_uri: str) -> Union[str, None]:

g = _V(g=self.g, label=VertexTypes.Table, key=table_uri). \
outE(EdgeTypes.Description.value.label).inV(). \
has(VertexTypes.Description.value.label, 'source', 'user'). \
has('description_source', 'description'). \
values('description').fold()
descriptions = self.query_executor()(query=g, get=FromResultSet.getOnly)
return _safe_get(descriptions)
Expand All @@ -1250,9 +1250,10 @@ def _put_table_description(self, *, table_uri: str, description: str, executor:
description = unquote(description)

# default table description is user added
desc_key = make_description_uri(subject_uri=table_uri, source='user')
desc_key = make_description_uri(subject_uri=table_uri, source='description')

_upsert(executor=executor, g=self.g, label=VertexTypes.Description, key=desc_key,
key_property_name=self.key_property_name, description=description, source='user')
key_property_name=self.key_property_name, description=description, description_source='description')
_link(executor=executor, g=self.g, edge_label=EdgeTypes.Description, key_property_name=self.key_property_name,
vertex1_label=VertexTypes.Table, vertex1_key=table_uri,
vertex2_label=VertexTypes.Description, vertex2_key=desc_key)
Expand Down Expand Up @@ -1352,9 +1353,12 @@ def _put_column_description(

column_uri = make_column_uri(table_uri=table_uri, column_name=column_name)
# default table description is user added
desc_key = make_description_uri(subject_uri=column_uri, source='user')
vertex_id: Any = _upsert(executor=executor, g=self.g, label=VertexTypes.Description, key=desc_key,
key_property_name=self.key_property_name, description=description, source='user')
desc_key = make_description_uri(subject_uri=column_uri, source='description')
vertex_id: Any = _upsert(
executor=executor, g=self.g,
label=VertexTypes.Description, key=desc_key, key_property_name=self.key_property_name,
description=description, description_source='description'
)
_link(executor=executor, g=self.g, edge_label=EdgeTypes.Description, key_property_name=self.key_property_name,
vertex1_label=VertexTypes.Column, vertex1_key=column_uri, vertex2_id=vertex_id)

Expand All @@ -1371,7 +1375,7 @@ def get_column_description(self, *, table_uri: str, column_name: str) -> Union[s
column_uri = make_column_uri(table_uri=table_uri, column_name=column_name)
g = _V(g=self.g, label=VertexTypes.Column, key=column_uri)
g = g.outE(EdgeTypes.Description.value.label).inV()
g = g.has(VertexTypes.Description.value.label, 'source', 'user').values('description')
g = g.has(VertexTypes.Description.value.label, 'description_source', 'description').values('description')
return self.query_executor()(query=g, get=FromResultSet.getOptional)

@timer_with_counter
Expand Down
7 changes: 1 addition & 6 deletions metadata_service/proxy/neptune_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
from amundsen_gremlin.neptune_bulk_loader.api import (
NeptuneBulkLoaderApi, get_neptune_graph_traversal_source_factory)
from amundsen_gremlin.script_translator import ScriptTranslatorTargetNeptune
from amundsen_gremlin.test_and_development_shard import (
_reset_for_testing_only, get_shard, shard_set_explicitly)
from amundsen_gremlin.test_and_development_shard import get_shard
from for_requests.assume_role_aws4auth import AssumeRoleAWS4Auth
from for_requests.aws4auth_compatible import to_aws4_request_compatible_host
from for_requests.host_header_ssl import HostHeaderSSLAdapter
Expand Down Expand Up @@ -66,10 +65,6 @@ def __init__(self, *, host: str, port: Optional[int] = None, user: str = None,
if port is not None:
raise NotImplementedError(f'port is not allowed! port={port}')

if client_kwargs.get('ignore_neptune_shard', False):
_reset_for_testing_only()
shard_set_explicitly('')

# for IAM auth, we need the triplet or a Session which is more general
if isinstance(password, boto3.session.Session):
session = password
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ typing-extensions==3.7.4
# A common package that holds the models deifnition and schemas that are used
# accross different amundsen repositories.
amundsen-common>=0.9.0
amundsen-gremlin>=0.0.7
amundsen-gremlin>=0.0.9

boto3==1.17.23
flasgger==0.9.3
Expand Down

0 comments on commit 18454fe

Please sign in to comment.