diff --git a/Common/build_manager.py b/Common/build_manager.py index 74bd1776..453cddf4 100644 --- a/Common/build_manager.py +++ b/Common/build_manager.py @@ -3,7 +3,8 @@ import argparse import datetime import requests -import json + +from pathlib import Path from xxhash import xxh64_hexdigest from collections import defaultdict from Common.biolink_utils import BiolinkInformationResources, INFORES_STATUS_INVALID, INFORES_STATUS_DEPRECATED @@ -28,51 +29,54 @@ class GraphBuilder: - def __init__(self): + def __init__(self, + graph_specs_dir=None): self.logger = LoggingUtil.init_logging("ORION.Common.GraphBuilder", line_format='medium', log_file_path=os.environ['ORION_LOGS']) - # This dictionary holds the versions of graphs from the graph spec. - # This is more temperamental than it seems because the only way to get the current version for many sources - # is to retrieve them online. Graph versions are generated from underlying data source versions, so if versions - # are not explicitly specified in the graph spec, they may need to be retrieved. - self.graph_id_to_version = {} - - self.graphs_dir = self.init_graphs_dir() # path to the graphs output directory + self.graphs_dir = self.get_graphs_dir() # path to the graphs output directory self.source_data_manager = SourceDataManager() # access to the data sources and their metadata - self.graph_specs = self.load_graph_specs() # list of potential graphs to build (GraphSpec objects) + self.graph_specs = {} # graph_id -> GraphSpec all potential graphs that could be built, including sub-graphs + self.load_graph_specs(graph_specs_dir=graph_specs_dir) self.build_results = {} - def build_graph(self, graph_id: str): + def build_graph(self, graph_spec: GraphSpec): - self.logger.info(f'Building graph {graph_id}. Checking dependencies...') - graph_spec = self.get_graph_spec(graph_id) - graph_version = self.get_graph_version(graph_id) + graph_id = graph_spec.graph_id + self.logger.info(f'Building graph {graph_id}...') - if self.build_dependencies(graph_spec): - self.logger.info(f'Building graph {graph_id}. Dependencies are ready...') - else: - self.logger.warning(f'Aborting graph {graph_spec.graph_id}, building dependencies failed.') - return - - # check the status for previous builds of this version + graph_version = self.determine_graph_version(graph_spec) graph_metadata = self.get_graph_metadata(graph_id, graph_version) + graph_output_dir = self.get_graph_dir_path(graph_id, graph_version) + + # check for previous builds of this same graph build_status = graph_metadata.get_build_status() if build_status == Metadata.IN_PROGRESS: - self.logger.info(f'Graph {graph_id} version {graph_version} is already in progress. Skipping..') - return + self.logger.info(f'Graph {graph_id} version {graph_version} has status: in progress. ' + f'This means either the graph is already in the process of being built, ' + f'or an error occurred previously that could not be handled. ' + f'You may need to clean up and/or remove the failed build.') + return False if build_status == Metadata.BROKEN or build_status == Metadata.FAILED: self.logger.info(f'Graph {graph_id} version {graph_version} previously failed to build. Skipping..') - return - - graph_output_dir = self.get_graph_dir_path(graph_id, graph_version) - if build_status != Metadata.STABLE: + return False + if build_status == Metadata.STABLE: + self.logger.info(f'Graph {graph_id} version {graph_version} was already built.') + return True + else: # if we get here we need to build the graph - self.logger.info(f'Building graph {graph_id} version {graph_version}. Merging sources...') + self.logger.info(f'Building graph {graph_id} version {graph_version}, checking dependencies...') + if not self.build_dependencies(graph_spec): + self.logger.warning(f'Aborting graph {graph_spec.graph_id} version {graph_version}, building ' + f'dependencies failed.') + return False + + self.logger.info(f'Building graph {graph_id} version {graph_version}. ' + f'Dependencies ready, merging sources...') graph_metadata.set_build_status(Metadata.IN_PROGRESS) graph_metadata.set_graph_version(graph_version) graph_metadata.set_graph_name(graph_spec.graph_name) @@ -90,16 +94,14 @@ def build_graph(self, graph_id: str): if "merge_error" in merge_metadata: graph_metadata.set_build_error(merge_metadata["merge_error"], current_time) graph_metadata.set_build_status(Metadata.FAILED) - self.logger.error(f'Error building graph {graph_id}.') - return + self.logger.error(f'Merge error occured while building graph {graph_id}: ' + f'{merge_metadata["merge_error"]}') + return False graph_metadata.set_build_info(merge_metadata, current_time) graph_metadata.set_build_status(Metadata.STABLE) self.logger.info(f'Building graph {graph_id} complete!') self.build_results[graph_id] = {'version': graph_version, 'success': True} - else: - self.logger.info(f'Graph {graph_id} version {graph_version} was already built.') - self.build_results[graph_id] = {'version': graph_version, 'success': False} if not graph_metadata.has_qc(): self.logger.info(f'Running QC for graph {graph_id}...') @@ -108,8 +110,8 @@ def build_graph(self, graph_id: str): if qc_results['pass']: self.logger.info(f'QC passed for graph {graph_id}.') else: - self.logger.info(f'QC failed for graph {graph_id}, bailing..') - return + self.logger.warning(f'QC failed for graph {graph_id}.') + self.build_results[graph_id] = {'version': graph_version, 'success': False} needs_meta_kg = not self.has_meta_kg(graph_directory=graph_output_dir) needs_test_data = not self.has_test_data(graph_directory=graph_output_dir) @@ -132,7 +134,7 @@ def build_graph(self, graph_id: str): logger=self.logger) if dump_success: - graph_output_url = self.get_graph_output_URL(graph_id, graph_version) + graph_output_url = self.get_graph_output_url(graph_id, graph_version) graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}.db.dump') if 'redundant_jsonl' in output_formats: @@ -140,100 +142,117 @@ def build_graph(self, graph_id: str): redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME) generate_redundant_kg(edges_filepath, redundant_filepath) - def get_graph_version(self, graph_id: str) -> str: - if graph_id not in self.graph_id_to_version: - graph_spec = self.get_graph_spec(graph_id) - if graph_spec is not None: - if graph_spec.graph_version is None: - try: - graph_spec.graph_version = self.generate_graph_version(graph_spec) - except (GetDataPullError, DataVersionError) as e: - raise GraphSpecError(error_message=e.error_message) - self.graph_id_to_version[graph_id] = graph_spec.graph_version - else: - raise GraphSpecError(error_message=f'Tried to determine the version for a ' - f'graph that was not found in the Graph Spec: {graph_id}.') - return self.graph_id_to_version[graph_id] + return True + + # determine a graph version utilizing versions of data sources, or just return the graph version specified + def determine_graph_version(self, graph_spec: GraphSpec): + # if the version was set or previously determined just back out + if graph_spec.graph_version: + return graph_spec.graph_version + try: + # go out and find the latest version for any data source that doesn't have a version specified + for source in graph_spec.sources: + if not source.source_version: + source.source_version = self.source_data_manager.get_latest_source_version(source.id) + self.logger.info(f'Source version - {source.id}: {source.version}') + + # for sub-graphs, if a graph version isn't specified, + # use the graph spec for that subgraph to determine a graph version + for subgraph in graph_spec.subgraphs: + if not subgraph.graph_version: + subgraph_graph_spec = self.graph_specs.get(subgraph.id, None) + if subgraph_graph_spec: + subgraph.graph_version = self.determine_graph_version(subgraph_graph_spec) + self.logger.info(f'found subgraph version, {graph_spec.graph_id}: {subgraph.graph_version}') + else: + raise GraphSpecError(f'Subgraph {subgraph.id} requested for graph {graph_spec.graph_id} ' + f'but the version was not specified and could not be determined without ' + f'a graph spec for {subgraph.id}.') + except (GetDataPullError, DataVersionError) as e: + raise GraphSpecError(error_message=e.error_message) + + # make a string that is a composite of versions and their merge strategy for each source + composite_version_string = "" + if graph_spec.sources: + composite_version_string += '_'.join([graph_source.version + '_' + graph_source.merge_strategy + if graph_source.merge_strategy else graph_source.version + for graph_source in graph_spec.sources]) + if graph_spec.subgraphs: + if composite_version_string: + composite_version_string += '_' + composite_version_string += '_'.join([sub_graph_source.version + '_' + sub_graph_source.merge_strategy + if sub_graph_source.merge_strategy else sub_graph_source.version + for sub_graph_source in graph_spec.subgraphs]) + graph_version = xxh64_hexdigest(composite_version_string) + graph_spec.graph_version = graph_version + self.logger.info(f'Version determined for graph {graph_spec.graph_id}: {graph_version} ({composite_version_string})') + return graph_version def build_dependencies(self, graph_spec: GraphSpec): + graph_id = graph_spec.graph_id for subgraph_source in graph_spec.subgraphs: subgraph_id = subgraph_source.id subgraph_version = subgraph_source.version - # Get the subgraph version from the subgraph source spec, - # which will either be one specified in the graph spec or None. - if subgraph_version is None: - try: - # if one was not specified, retrieve or generate it like we would any graph version - subgraph_version = self.get_graph_version(subgraph_id) - except GraphSpecError: - self.logger.error(f'Could not determine version of subgraph {subgraph_id}. ' - f'Either specify an existing version of the graph, or the subgraph must ' - f'be defined in the same Graph Spec.') - return False - if self.check_for_existing_graph_dir(subgraph_id, subgraph_version): - # load previous metadata if the specified subgraph version was already built - graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) - subgraph_source.graph_metadata = graph_metadata.metadata - else: + if not self.check_for_existing_graph_dir(subgraph_id, subgraph_version): # If the subgraph doesn't already exist, we need to make sure it matches the current version of the # subgraph as generated by the current graph spec, otherwise we won't be able to build it. - current_subgraph_version = self.get_graph_version(subgraph_id) - if subgraph_version == current_subgraph_version: - self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency ' - f'{subgraph_id} is not ready. Building now...') - self.build_graph(subgraph_id) - else: - self.logger.error(f'Subgraph ({subgraph_id}) version ({subgraph_version}) was specified, but that ' + subgraph_graph_spec = self.graph_specs.get(subgraph_id, None) + if not subgraph_graph_spec: + self.logger.warning(f'Subgraph {subgraph_id} version {subgraph_version} was requested for graph ' + f'{graph_id} but it was not found and could not be built without a Graph Spec.') + return False + + if subgraph_version != subgraph_graph_spec.graph_version: + self.logger.error(f'Subgraph {subgraph_id} version {subgraph_version} was specified, but that ' f'version of the graph could not be found. It can not be built now because the ' - f'current version is {current_subgraph_version}. Either specify a version that ' - f'is already built, or leave the subgraph version blank to automatically ' - f'build the new one.') + f'current version is {subgraph_graph_spec.graph_version}. Either specify a ' + f'version that is already built, or remove the subgraph version specification to ' + f'automatically include the latest one.') + return False - graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) - if graph_metadata.get_build_status() == Metadata.STABLE: - # we found the subgraph and it's stable - update the GraphSource in preparation for building the graph + # here the graph specs and versions all look right, but we still need to build the subgraph + self.logger.warning(f'Graph {graph_id}, subgraph dependency {subgraph_id} is not ready. Building now..') + subgraph_build_success = self.build_graph(subgraph_graph_spec) + if not subgraph_build_success: + return False + + # confirm the subgraph build worked and update the DataSource object in preparation for merging + subgraph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) + subgraph_source.graph_metadata = subgraph_metadata + if subgraph_metadata.get_build_status() == Metadata.STABLE: subgraph_dir = self.get_graph_dir_path(subgraph_id, subgraph_version) subgraph_nodes_path = self.get_graph_nodes_file_path(subgraph_dir) subgraph_edges_path = self.get_graph_edges_file_path(subgraph_dir) subgraph_source.file_paths = [subgraph_nodes_path, subgraph_edges_path] else: - self.logger.warning( - f'Attempting to build graph {graph_spec.graph_id} failed, dependency ' - f'subgraph {subgraph_id} version {subgraph_version} was not built successfully.') + self.logger.warning(f'Attempting to build graph {graph_id} failed, dependency subgraph {subgraph_id} ' + f'version {subgraph_version} was not built successfully.') return False for data_source in graph_spec.sources: source_id = data_source.id - source_version = data_source.source_version - if source_id not in get_available_data_sources(): - self.logger.warning( - f'Attempting to build graph {graph_spec.graph_id} failed: ' - f'{source_id} is not a valid data source id. ') - return False - source_metadata: SourceMetadata = self.source_data_manager.get_source_metadata(source_id, - source_version) - release_version = source_metadata.get_release_version(parsing_version=data_source.parsing_version, - normalization_version=data_source.normalization_scheme.get_composite_normalization_version(), - supplementation_version=data_source.supplementation_version) - if release_version is None: + data_source.source_version) + release_version = data_source.generate_version() + release_metadata = source_metadata.get_release_info(release_version) + if release_metadata is None: self.logger.info( - f'Attempting to build graph {graph_spec.graph_id}, ' + f'Attempting to build graph {graph_id}, ' f'dependency {source_id} is not ready. Building now...') - release_version = self.source_data_manager.run_pipeline(source_id, - source_version=source_version, + pipeline_sucess = self.source_data_manager.run_pipeline(source_id, + source_version=data_source.source_version, parsing_version=data_source.parsing_version, normalization_scheme=data_source.normalization_scheme, supplementation_version=data_source.supplementation_version) - if not release_version: - self.logger.info( - f'While attempting to build {graph_spec.graph_id}, dependency pipeline failed for {source_id}...') + if not pipeline_sucess: + self.logger.info(f'While attempting to build {graph_spec.graph_id}, ' + f'data source pipeline failed for dependency {source_id}...') return False + release_metadata = source_metadata.get_release_info(release_version) - data_source.version = release_version - data_source.release_info = source_metadata.get_release_info(release_version) + data_source.release_info = release_metadata data_source.file_paths = self.source_data_manager.get_final_file_paths(source_id, - source_version, + data_source.source_version, data_source.parsing_version, data_source.normalization_scheme.get_composite_normalization_version(), data_source.supplementation_version) @@ -333,66 +352,69 @@ def run_qc(self, qc_metadata['warnings']['invalid_knowledge_sources'] = invalid_infores_ids return qc_metadata - def load_graph_specs(self): - if 'ORION_GRAPH_SPEC' in os.environ and os.environ['ORION_GRAPH_SPEC']: - # this is a messy way to find the graph spec path, mainly for testing - URL is preferred - graph_spec_file = os.environ['ORION_GRAPH_SPEC'] - graph_spec_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'graph_specs', graph_spec_file) + def load_graph_specs(self, graph_specs_dir=None): + graph_spec_file = os.environ.get('ORION_GRAPH_SPEC', None) + graph_spec_url = os.environ.get('ORION_GRAPH_SPEC_URL', None) + + if graph_spec_file and graph_spec_url: + raise GraphSpecError(f'Configuration Error - the environment variables ORION_GRAPH_SPEC and ' + f'ORION_GRAPH_SPEC_URL were set. Please choose one or the other. See the README for ' + f'details.') + + if graph_spec_file: + if not graph_specs_dir: + graph_specs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'graph_specs') + graph_spec_path = os.path.join(graph_specs_dir, graph_spec_file) if os.path.exists(graph_spec_path): self.logger.info(f'Loading graph spec: {graph_spec_file}') with open(graph_spec_path) as graph_spec_file: - graph_spec_yaml = yaml.full_load(graph_spec_file) - return self.parse_graph_spec(graph_spec_yaml) + graph_spec_yaml = yaml.safe_load(graph_spec_file) + self.parse_graph_spec(graph_spec_yaml) + return else: - raise Exception(f'Configuration Error - Graph Spec could not be found: {graph_spec_file}') - elif 'ORION_GRAPH_SPEC_URL' in os.environ: - graph_spec_url = os.environ['ORION_GRAPH_SPEC_URL'] + raise GraphSpecError(f'Configuration Error - Graph Spec could not be found: {graph_spec_file}') + + if graph_spec_url: graph_spec_request = requests.get(graph_spec_url) graph_spec_request.raise_for_status() - graph_spec_yaml = yaml.full_load(graph_spec_request.text) - return self.parse_graph_spec(graph_spec_yaml) - else: - raise Exception(f'Configuration Error - No Graph Spec was configured. Set the environment variable ' - f'ORION_GRAPH_SPEC_URL to a URL with a valid Graph Spec yaml file. ' - f'See the README for more info.') + graph_spec_yaml = yaml.safe_load(graph_spec_request.text) + self.parse_graph_spec(graph_spec_yaml) + return + + raise GraphSpecError(f'Configuration Error - No Graph Spec was configured. Set the environment variable ' + f'ORION_GRAPH_SPEC to the name of a graph spec included in this package, or ' + f'ORION_GRAPH_SPEC_URL to a URL of a valid Graph Spec yaml file. ' + f'See the README for more info.') def parse_graph_spec(self, graph_spec_yaml): - graph_specs = [] - graph_id = "" + graph_id = None try: for graph_yaml in graph_spec_yaml['graphs']: graph_id = graph_yaml['graph_id'] - graph_name = graph_yaml['graph_name'] if 'graph_name' in graph_yaml else "" - graph_description = graph_yaml['graph_description'] if 'graph_description' in graph_yaml else "" - graph_url = graph_yaml['graph_url'] if 'graph_url' in graph_yaml else "" + graph_name = graph_yaml.get('graph_name', '') + graph_description = graph_yaml.get('graph_description', '') + graph_url = graph_yaml.get('graph_url', '') # parse the list of data sources - data_sources = [self.parse_data_source_spec(data_source) for data_source in graph_yaml['sources']] \ - if 'sources' in graph_yaml else [] + data_sources = [self.parse_data_source_spec(data_source) + for data_source in graph_yaml.get('sources', [])] # parse the list of subgraphs - subgraph_sources = [self.parse_subgraph_spec(subgraph) for subgraph in graph_yaml['subgraphs']] \ - if 'subgraphs' in graph_yaml else [] + subgraph_sources = [self.parse_subgraph_spec(subgraph) + for subgraph in graph_yaml.get('subgraphs', [])] if not data_sources and not subgraph_sources: - self.logger.error(f'Error: No sources were provided for graph: {graph_id}.') - continue + raise GraphSpecError('Error: No sources were provided for graph: {graph_id}.') - # take any normalization scheme parameters specified at the graph level - graph_wide_node_norm_version = graph_yaml['node_normalization_version'] \ - if 'node_normalization_version' in graph_yaml else None + # see if there are any normalization scheme parameters specified at the graph level + graph_wide_node_norm_version = graph_yaml.get('node_normalization_version', None) + graph_wide_edge_norm_version = graph_yaml.get('edge_normalization_version', None) + graph_wide_conflation = graph_yaml.get('conflation', None) + graph_wide_strict_norm = graph_yaml.get('strict_normalization', None) if graph_wide_node_norm_version == 'latest': graph_wide_node_norm_version = self.source_data_manager.get_latest_node_normalization_version() - graph_wide_edge_norm_version = graph_yaml['edge_normalization_version'] \ - if 'edge_normalization_version' in graph_yaml else None if graph_wide_edge_norm_version == 'latest': graph_wide_edge_norm_version = self.source_data_manager.get_latest_edge_normalization_version() - graph_wide_conflation = graph_yaml['conflation'] \ - if 'conflation' in graph_yaml else None - graph_wide_strict_norm = graph_yaml['strict_normalization'] \ - if 'strict_normalization' in graph_yaml else None - graph_wide_normalization_code_version = graph_yaml['normalization_code_version'] \ - if 'normalization_code_version' in graph_yaml else None # apply them to all the data sources, this will overwrite anything defined at the source level for data_source in data_sources: @@ -404,100 +426,101 @@ def parse_graph_spec(self, graph_spec_yaml): data_source.normalization_scheme.conflation = graph_wide_conflation if graph_wide_strict_norm is not None: data_source.normalization_scheme.strict = graph_wide_strict_norm - if graph_wide_normalization_code_version is not None: - data_source.normalization_scheme.normalization_code_version = graph_wide_normalization_code_version - graph_output_format = graph_yaml['output_format'] if 'output_format' in graph_yaml else "" + graph_output_format = graph_yaml.get('output_format', '') graph_spec = GraphSpec(graph_id=graph_id, graph_name=graph_name, graph_description=graph_description, graph_url=graph_url, - graph_version=None, # this will get populated later + graph_version=None, # this will get populated when a build is triggered graph_output_format=graph_output_format, subgraphs=subgraph_sources, sources=data_sources) - graph_specs.append(graph_spec) - except Exception as e: - self.logger.error(f'Error parsing Graph Spec ({graph_id}), formatting error or missing information: {repr(e)}') - raise e - return graph_specs + self.graph_specs[graph_id] = graph_spec + except KeyError as e: + error_message = f'Graph Spec missing required field: {e}' + if graph_id is not None: + error_message += f"(in graph {graph_id})" + raise GraphSpecError(error_message) def parse_subgraph_spec(self, subgraph_yml): subgraph_id = subgraph_yml['graph_id'] - subgraph_version = subgraph_yml['graph_version'] if 'graph_version' in subgraph_yml else None - merge_strategy = subgraph_yml['merge_strategy'] if 'merge_strategy' in subgraph_yml else 'default' + subgraph_version = subgraph_yml.get('graph_version', None) + merge_strategy = subgraph_yml.get('merge_strategy', None) + if merge_strategy == 'default': + merge_strategy = None subgraph_source = SubGraphSource(id=subgraph_id, - version=subgraph_version, + graph_version=subgraph_version, merge_strategy=merge_strategy) return subgraph_source def parse_data_source_spec(self, source_yml): + # get the source id and make sure it's valid source_id = source_yml['source_id'] if source_id not in get_available_data_sources(): error_message = f'Data source {source_id} is not a valid data source id.' self.logger.error(error_message + " " + f'Valid sources are: {", ".join(get_available_data_sources())}') - raise Exception(error_message) - - parsing_version = source_yml['parsing_version'] if 'parsing_version' in source_yml \ - else self.source_data_manager.get_latest_parsing_version(source_id) - merge_strategy = source_yml['merge_strategy'] if 'merge_strategy' in source_yml else 'default' - node_normalization_version = source_yml['node_normalization_version'] \ - if 'node_normalization_version' in source_yml \ - else self.source_data_manager.get_latest_node_normalization_version() - edge_normalization_version = source_yml['edge_normalization_version'] \ - if 'edge_normalization_version' in source_yml \ - else self.source_data_manager.get_latest_edge_normalization_version() - strict_normalization = source_yml['strict_normalization'] \ - if 'strict_normalization' in source_yml else True - normalization_code_version = source_yml['normalization_code_version'] \ - if 'normalization_code_version' in source_yml else NORMALIZATION_CODE_VERSION - conflation = source_yml['conflation'] \ - if 'conflation' in source_yml else False + raise GraphSpecError(error_message) + + # read version and normalization specifications from the graph spec + source_version = source_yml.get('source_version', None) + parsing_version = source_yml.get('parsing_version', None) + merge_strategy = source_yml.get('merge_strategy', None) + node_normalization_version = source_yml.get('node_normalization_version', None) + edge_normalization_version = source_yml.get('edge_normalization_version', None) + strict_normalization = source_yml.get('strict_normalization', True) + conflation = source_yml.get('conflation', False) + + # supplementation and normalization code version cannot be specified, set them to the current version + supplementation_version = SequenceVariantSupplementation.SUPPLEMENTATION_VERSION + normalization_code_version = NORMALIZATION_CODE_VERSION + + # if normalization versions are not specified, set them to the current latest + # source_version is intentionally not handled here because we want to do it lazily and avoid if not needed + if not parsing_version or parsing_version == 'latest': + parsing_version = self.source_data_manager.get_latest_parsing_version(source_id) + if not node_normalization_version or node_normalization_version == 'latest': + node_normalization_version = self.source_data_manager.get_latest_node_normalization_version() + if not edge_normalization_version or edge_normalization_version == 'latest': + edge_normalization_version = self.source_data_manager.get_latest_edge_normalization_version() + + # do some validation + if type(strict_normalization) != bool: + raise GraphSpecError(f'Invalid type (strict_normalization: {strict_normalization}), must be true or false.') + if type(conflation) != bool: + raise GraphSpecError(f'Invalid type (conflation: {conflation}), must be true or false.') + if merge_strategy == 'default': + merge_strategy = None + normalization_scheme = NormalizationScheme(node_normalization_version=node_normalization_version, edge_normalization_version=edge_normalization_version, normalization_code_version=normalization_code_version, strict=strict_normalization, conflation=conflation) - supplementation_version = SequenceVariantSupplementation.SUPPLEMENTATION_VERSION - - # The DataSource() will get initialized with either a specific source version, if specified, - # or a callable function which can determine the latest source version. This is for a lazy initialization - # technique, so that we don't call get_latest_source_version until we need to, if at all. - if 'source_version' not in source_yml or source_yml['source_version'] == 'latest': - get_source_version = self.source_data_manager.get_latest_source_version - source_version = None - else: - source_version = str(source_yml['source_version']) - get_source_version = None data_source = DataSource(id=source_id, source_version=source_version, - get_source_version=get_source_version, merge_strategy=merge_strategy, normalization_scheme=normalization_scheme, parsing_version=parsing_version, supplementation_version=supplementation_version) return data_source - def get_graph_spec(self, graph_id: str): - for graph_spec in self.graph_specs: - if graph_spec.graph_id == graph_id: - return graph_spec - return None - def get_graph_dir_path(self, graph_id: str, graph_version: str): return os.path.join(self.graphs_dir, graph_id, graph_version) - def get_graph_output_URL(self, graph_id: str, graph_version: str): + def get_graph_output_url(self, graph_id: str, graph_version: str): graph_output_url = os.environ.get('ORION_OUTPUT_URL', "https://localhost/") if graph_output_url[-1] != '/': graph_output_url += '/' return f'{graph_output_url}{graph_id}/{graph_version}/' - def get_graph_nodes_file_path(self, graph_output_dir: str): + @staticmethod + def get_graph_nodes_file_path(graph_output_dir: str): return os.path.join(graph_output_dir, NODES_FILENAME) - def get_graph_edges_file_path(self, graph_output_dir: str): + @staticmethod + def get_graph_edges_file_path(graph_output_dir: str): return os.path.join(graph_output_dir, EDGES_FILENAME) def check_for_existing_graph_dir(self, graph_id: str, graph_version: str): @@ -514,54 +537,32 @@ def get_graph_metadata(self, graph_id: str, graph_version: str): return GraphMetadata(graph_id, graph_output_dir) @staticmethod - def generate_graph_version(graph_spec: GraphSpec): - sources_string = ''.join( - [json.dumps(graph_source.get_metadata_representation()) - for graph_source in graph_spec.sources]) - subgraphs_string = ''.join( - [''.join([subgraph.id, subgraph.version, subgraph.merge_strategy]) - for subgraph in graph_spec.subgraphs]) - graph_version = xxh64_hexdigest(sources_string + subgraphs_string) - return graph_version - - @staticmethod - def init_graphs_dir(): - # use the directory specified by the environment variable ORION_GRAPHS - if 'ORION_GRAPHS' in os.environ and os.path.isdir(os.environ['ORION_GRAPHS']): + def get_graphs_dir(): + # confirm the directory specified by the environment variable ORION_GRAPHS is valid + graphs_dir = os.environ.get('ORION_GRAPHS', None) + if graphs_dir and Path(graphs_dir).is_dir(): return os.environ['ORION_GRAPHS'] - else: - # if graph dir is invalid or not specified back out - raise IOError( - 'GraphBuilder graphs directory not found. ' - 'Specify a valid directory with environment variable ORION_GRAPHS.') + + # if invalid or not specified back out + raise IOError('ORION graphs directory not configured properly. ' + 'Specify a valid directory with environment variable ORION_GRAPHS.') if __name__ == '__main__': - parser = argparse.ArgumentParser(description="Merge data source files into complete graphs.") + parser = argparse.ArgumentParser(description="Merge data sources into complete graphs.") parser.add_argument('graph_id', help='ID of the graph to build. Must match an ID from the configured Graph Spec.') - parser.add_argument('-v', '--version', - action='store_true', - help='Only retrieve a generated version for graphs from the graph spec.') args = parser.parse_args() graph_id_arg = args.graph_id - retrieve_version = args.version graph_builder = GraphBuilder() if graph_id_arg == "all": - if retrieve_version: - graph_versions = [graph_spec.graph_version for graph_spec in graph_builder.graph_specs] - print('\n'.join(graph_versions)) - else: - for g_id in [graph_spec.graph_id for graph_spec in graph_builder.graph_specs]: - graph_builder.build_graph(g_id) + for graph_spec in graph_builder.graph_specs.values(): + graph_builder.build_graph(graph_spec) else: - graph_spec = graph_builder.get_graph_spec(graph_id_arg) + graph_spec = graph_builder.graph_specs.get(graph_id_arg, None) if graph_spec: - if retrieve_version: - print(graph_spec.graph_version) - else: - graph_builder.build_graph(graph_id_arg) + graph_builder.build_graph(graph_spec) else: print(f'Invalid graph spec requested: {graph_id_arg}') for results_graph_id, results in graph_builder.build_results.items(): diff --git a/Common/kgx_file_merger.py b/Common/kgx_file_merger.py index b6d54159..19abb14d 100644 --- a/Common/kgx_file_merger.py +++ b/Common/kgx_file_merger.py @@ -43,10 +43,12 @@ def merge(self, primary_sources = [] secondary_sources = [] for graph_source in chain(graph_spec.sources, graph_spec.subgraphs): - if graph_source.merge_strategy == 'default': + if not graph_source.merge_strategy: primary_sources.append(graph_source) elif graph_source.merge_strategy == 'connected_edge_subset': secondary_sources.append(graph_source) + else: + return {'merge_error': f'Unsupported merge strategy specified: {graph_source.merge_strategy}'} # TODO we should be able to process a single primary source more efficiently (ie copy and paste it) # if len(primary_sources) == 1: @@ -73,8 +75,9 @@ def merge(self, all_source_ids = [graph_source.id for graph_source in chain(graph_spec.sources, graph_spec.subgraphs)] missing_data_sets = [source_id for source_id in all_source_ids if source_id not in merge_metadata['sources'].keys()] - self.logger.error(f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}") - + error_message = f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}" + self.logger.error(error_message) + merge_metadata["merge_error"] = error_message return merge_metadata def merge_primary_sources(self, diff --git a/Common/kgxmodel.py b/Common/kgxmodel.py index 1ce3b41a..b161e6a0 100644 --- a/Common/kgxmodel.py +++ b/Common/kgxmodel.py @@ -1,7 +1,6 @@ -from dataclasses import dataclass, InitVar -from typing import Callable +from dataclasses import dataclass from Common.biolink_constants import NAMED_THING -from Common.metadata import GraphMetadata +from Common.metadata import GraphMetadata, get_source_release_version from Common.normalization import NormalizationScheme @@ -62,67 +61,65 @@ def get_metadata_representation(self): @dataclass class GraphSource: id: str - version: str = None - merge_strategy: str = 'default' + merge_strategy: str = None file_paths: list = None + # Version may be generated when requested and differs for subclasses of GraphSource. + def __getattribute__(self, name): + if name == "version": + return self.generate_version() + else: + return object.__getattribute__(self, name) + @dataclass class SubGraphSource(GraphSource): + graph_version: str = None graph_metadata: GraphMetadata = None def get_metadata_representation(self): return {'graph_id': self.id, - 'release_version': self.version, + 'graph_version': self.graph_version, 'merge_strategy:': self.merge_strategy, 'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None} + def generate_version(self): + return self.graph_version + @dataclass class DataSource(GraphSource): normalization_scheme: NormalizationScheme = None - source_version: InitVar[str] = None + source_version: str = None parsing_version: str = None supplementation_version: str = None release_info: dict = None - # This function serves as an optional way to provide a callable function which can determine the source version, - # instead of setting it during initialization. This is used like lazy initialization, because determining the - # source version of a data source can be expensive and error-prone, and we don't want to do it if we don't need to. - get_source_version: InitVar[Callable[[str], str]] = None - _source_version: str = None - _get_source_version: Callable[[str], str] = None - - def __post_init__(self, source_version, get_source_version): - self._get_source_version = get_source_version - # if a source_version is provided in initialization, just store that and return it - if source_version: - self._source_version = source_version - # if neither the source version nor a function to determine it is provided, throw an error - if not source_version and not get_source_version: - raise Exception(f'Invalid DataSource initialization - ' - f'source_version or get_source_version must be provided.') - - # when the source_version attribute is accessed either return _source_version if it is set - # or call the function supplied to retrieve it - def __getattribute__(self, name): - if name == "source_version": - if self._source_version is None: - self._source_version = self._get_source_version(self.id) - return self._source_version - else: - return object.__getattribute__(self, name) - def get_metadata_representation(self): metadata = {'source_id': self.id, - 'source_version': self.source_version, # this may produce an IDE warning but it's right - 'release_version': self.version, + 'source_version': self.source_version, 'parsing_version': self.parsing_version, 'supplementation_version': self.supplementation_version, 'normalization_scheme': self.normalization_scheme.get_metadata_representation(), + 'release_version': self.generate_version(), 'merge_strategy': self.merge_strategy} if self.release_info: metadata.update(self.release_info) return metadata - + # We can use generate_version to see if a source_version was already set. If not, we don't try to generate an + # overall version because we can't. Typical usage would be a lazy instantiation approach, first setting + # source_version to None, then checking this and retrieving/setting the source_version if needed, + # after which the overall version can be generated. + # + # We use get_source_release_version to generate versions for data sources the same deterministic way that + # the data source pipeline uses, so a version generated by a graph spec will match the release version generated by + # previous runs of the pipeline. + def generate_version(self): + if self.source_version is None: + return None + return get_source_release_version(self.id, + self.source_version, + self.parsing_version, + self.normalization_scheme.get_composite_normalization_version(), + self.supplementation_version) diff --git a/Common/load_manager.py b/Common/load_manager.py index 15029941..b4f4b168 100644 --- a/Common/load_manager.py +++ b/Common/load_manager.py @@ -504,20 +504,18 @@ def run_qc_and_metadata_stage(self, parsing_version: str, supplementation_version: str, normalization_scheme: NormalizationScheme): - # source data QC here - source_metadata = self.get_source_metadata(source_id, source_version) - normalization_version = normalization_scheme.get_composite_normalization_version() + # source data QC should go here self.logger.info(f'Generating release for {source_id}') + source_metadata = self.get_source_metadata(source_id, source_version) loader = SOURCE_DATA_LOADER_CLASSES[source_id](test_mode=self.test_mode) source_meta_information = loader.get_source_meta_information() - source_metadata.generate_release_metadata(parsing_version=parsing_version, - supplementation_version=supplementation_version, - normalization_version=normalization_version, - source_meta_information=source_meta_information) - return source_metadata.get_release_version(parsing_version=parsing_version, - supplementation_version=supplementation_version, - normalization_version=normalization_version) + normalization_version = normalization_scheme.get_composite_normalization_version() + release_version = source_metadata.generate_release_metadata(parsing_version=parsing_version, + supplementation_version=supplementation_version, + normalization_version=normalization_version, + source_meta_information=source_meta_information) + return release_version def get_source_metadata(self, source_id: str, source_version): if source_id not in self.source_metadata or source_version not in self.source_metadata[source_id]: diff --git a/Common/metadata.py b/Common/metadata.py index b2f9ca32..58f8f447 100644 --- a/Common/metadata.py +++ b/Common/metadata.py @@ -297,18 +297,6 @@ def has_supplemental_data(self, parsing_version: str, normalization_version: str except KeyError: return False - def get_release_version(self, - parsing_version: str, - normalization_version: str, - supplementation_version: str): - if "releases" in self.metadata: - for release_version, release in self.metadata["releases"].items(): - if ((release["parsing_version"] == parsing_version) and - (release["normalization_version"] == normalization_version) and - (release["supplementation_version"] == supplementation_version)): - return release_version - return None - def generate_release_metadata(self, parsing_version: str, normalization_version: str, @@ -316,12 +304,11 @@ def generate_release_metadata(self, source_meta_information: dict): if "releases" not in self.metadata: self.metadata["releases"] = {} - release_info = "".join([self.source_id, - self.source_version, - parsing_version, - normalization_version, - supplementation_version]) - release_version = xxh64_hexdigest(release_info) + release_version = get_source_release_version(self.source_id, + self.source_version, + parsing_version, + normalization_version, + supplementation_version) if release_version not in self.metadata["releases"]: self.metadata["releases"][release_version] = { "source_version": self.source_version, @@ -331,31 +318,22 @@ def generate_release_metadata(self, } self.metadata["releases"][release_version].update(source_meta_information) self.save_metadata() + return release_version def get_release_info(self, release_version: str): if 'releases' in self.metadata and release_version in self.metadata['releases']: return self.metadata['releases'][release_version] + return None - ''' - these need to be updated for the new versioning format, but we may not need them - def get_final_node_count(self): - try: - node_count = 0 - node_count += self.metadata['normalization_info']['final_normalized_nodes'] - if self.has_supplemental_data(): - node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_nodes'] - return node_count - except KeyError as k: - raise RuntimeError(f'Required metadata was not available: {k}') - - def get_final_edge_count(self): - try: - node_count = 0 - node_count += self.metadata['normalization_info']['final_normalized_edges'] - if self.has_supplemental_data(): - node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_edges'] - return node_count - except KeyError as k: - raise RuntimeError(f'Required metadata was not available: {k}') - ''' +def get_source_release_version(source_id, + source_version, + parsing_version, + normalization_version, + supplementation_version): + release_string = "_".join([source_id, + source_version, + parsing_version, + normalization_version, + supplementation_version]) + return xxh64_hexdigest(release_string) diff --git a/README.md b/README.md index 56dbbe1b..e1d9400c 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ To build a graph use a Graph Spec yaml file to specify the sources you want. ORION will automatically run each data source specified through the necessary pipeline. Then it will merge the specified sources into a Knowledge Graph. -### Using ORION +### Installing and Configuring ORION Create a parent directory: ``` @@ -30,13 +30,13 @@ git clone https://github.com/RobokopU24/ORION.git Next create directories where data sources, graphs, and logs will be stored. -ORION_STORAGE - for storing data sources +**ORION_STORAGE** - for storing data sources -ORION_GRAPHS - for storing knowledge graphs +**ORION_GRAPHS** - for storing knowledge graphs -ORION_LOGS - for storing logs +**ORION_LOGS** - for storing logs -You can do this manually, or use the script indicated below to set up a default configuration. +You can do this manually, or use the script indicated below to set up a default workspace. Option 1: Use this script to create the directories and set the environment variables: ``` @@ -44,7 +44,7 @@ cd ~/ORION_root/ORION/ source ./set_up_test_env.sh ``` -Option 2: Create three directories and manually set environment variables specifying paths to the locations of those directories. +Option 2: Create three directories and set environment variables specifying paths to the locations of those directories. ``` mkdir ~/ORION_root/storage/ export ORION_STORAGE=~/ORION_root/storage/ @@ -56,52 +56,92 @@ mkdir ~/ORION_root/logs/ export ORION_LOGS=~/ORION_root/logs/ ``` -Next create or select a Graph Spec yaml file where the content of knowledge graphs to be built will be specified. +Next create or select a Graph Spec yaml file, where the content of knowledge graphs to be built is specified. -Use either of the following options, but not both: - -Note that running the setup script set_up_test_env.sh will perform Option 1 for you. +Set either of the following environment variables, but not both: Option 1: ORION_GRAPH_SPEC - the name of a Graph Spec file located in the graph_specs directory of ORION ``` -export ORION_GRAPH_SPEC=testing-graph-spec.yml +export ORION_GRAPH_SPEC=example-graph-spec.yaml +``` +Option 2: ORION_GRAPH_SPEC_URL - a URL pointing to a Graph Spec yaml file +``` +export ORION_GRAPH_SPEC_URL=https://stars.renci.org/var/data_services/graph_specs/default-graph-spec.yaml ``` -Option 2: ORION_GRAPH_SPEC_URL - a URL pointing to a Graph Spec file + +To build a custom graph, alter a Graph Spec file, which is composed of a list of graphs. + +For each graph, specify: + +**graph_id** - a unique identifier string for the graph, with no spaces + +**sources** - a list of sources identifiers for data sources to include in the graph + +See the full list of data sources and their identifiers in the [data sources file](https://github.com/RobokopU24/ORION/blob/master/Common/data_sources.py). + +Here is a simple example. ``` -export ORION_GRAPH_SPEC_URL=https://example.com/example-graph-spec.yml +graphs: + - graph_id: Example_Graph + graph_name: Example Graph + graph_description: A free text description of what is in the graph. + output_format: neo4j + sources: + - source_id: CTD + - source_id: HGNC ``` -To build a custom graph, alter the Graph Spec file. See the graph_specs directory for examples. +There are variety of ways to further customize a knowledge graph. The following are parameters you can set for a particular data source. Mostly, these parameters are used to indicate that you'd like to use a previously built version of a data source or a specific normalization of a source. If you specify versions that are not the latest, and haven't previously built a data source or graph with those versions, it probably won't work. + +**source_version** - the version of the data source, as determined by ORION + +**parsing_version** - the version of the parsing code in ORION for this source + +**merge_strategy** - used to specify alternative merge strategies + +The following are parameters you can set for the entire graph, or for an individual data source: -TODO: explain options available in the graph spec (normalization version, source data version can be specified) +**node_normalization_version** - the version of the node normalizer API (see: https://nodenormalization-sri.renci.org/openapi.json) + +**edge_normalization_version** - the version of biolink model used to normalize predicates and validate the KG + +**strict_normalization** - True or False specifying whether to discard nodes, node types, and edges connected to those nodes when they fail to normalize + +**conflation** - True or False flag specifying whether to conflate genes with proteins and chemicals with drugs + +For example, we could customize the previous example: ``` graphs: - - graph_id: Example_Graph_ID + - graph_id: Example_Graph graph_name: Example Graph - graph_description: This is a description of what is in the graph. + graph_description: A free text description of what is in the graph. output_format: neo4j sources: - - source_id: Biolink + - source_id: CTD - source_id: HGNC ``` +See the graph_specs directory for more examples. + +### Running ORION + Install Docker to create and run the necessary containers. -By default using docker-compose up will build every graph in your Graph Spec. It runs the command: python /ORION/Common/build_manager.py all. +By default, using docker-compose up will build every graph in your Graph Spec. It runs the command: python /ORION/Common/build_manager.py all ``` docker-compose up ``` -If you want to specify an individual graph you can override the default command with a graph id from your Spec. +If you want to build an individual graph, you can override the default command with a graph_id from the Graph Spec: ``` -docker-compose run --rm orion python /ORION/Common/build_manager.py Example_Graph_ID +docker-compose run --rm orion python /ORION/Common/build_manager.py Example_Graph ``` -To run the ORION pipeline for a single data source, you can use: +To run the ORION pipeline for a single data source, you can use the load manager: ``` -docker-compose run --rm orion python /ORION/Common/load_manager.py Example_Source +docker-compose run --rm orion python /ORION/Common/load_manager.py CTD ``` To see available arguments and a list of supported data sources: ``` -python /ORION/Common/load_manager.py -h +docker-compose run --rm orion python /ORION/Common/load_manager.py -h ``` ### For Developers diff --git a/graph_specs/default-graph-spec.yml b/graph_specs/default-graph-spec.yml index aa47ce33..62900368 100644 --- a/graph_specs/default-graph-spec.yml +++ b/graph_specs/default-graph-spec.yml @@ -3,8 +3,8 @@ graphs: - graph_id: Baseline # optional parameters to be applied to the entire graph - see README for more info - # node_normalization_version: latest - # edge_normalization_version: latest + # node_normalization_version: 2.3.5 + # edge_normalization_version: 4.2.5 # graph_name: Robokop Baseline # graph_description: 'The baseline graph from which RobokopKG and other graphs are built.' # conflation: True # (whether to conflate node types like Genes and Proteins) diff --git a/graph_specs/example-graph-spec.yaml b/graph_specs/example-graph-spec.yaml new file mode 100644 index 00000000..35992210 --- /dev/null +++ b/graph_specs/example-graph-spec.yaml @@ -0,0 +1,18 @@ +# Example graph spec +graphs: + - graph_id: Example_Graph + graph_name: Example Graph + graph_description: 'This is a small graph that can be used as an example.' + graph_url: '' + output_format: neo4j + sources: + - source_id: CTD + # source_version: November_2024 + # parsing_version: 1.5 + # node_normalization_version: 2.3.18 + # edge_normalization_version: 4.2.1 + # conflation: False + # strict_normalization: True + # merge_strategy: + - source_id: HGNC + - source_id: GtoPdb \ No newline at end of file diff --git a/graph_specs/testing-graph-spec.yml b/graph_specs/testing-graph-spec.yml deleted file mode 100644 index a3fc82e5..00000000 --- a/graph_specs/testing-graph-spec.yml +++ /dev/null @@ -1,28 +0,0 @@ -# testing graph spec -graphs: - - graph_id: Testing_Baseline - graph_name: Testing Baseline - graph_description: 'A fake description for the testing baseline!' - graph_url: 'http://localhost/fake_graph_url_for_tesing_baseline' - output_format: none - sources: - - source_id: CTD - # source_version: - # parsing_version: - # node_normalization_version: latest - # edge_normalization_version: latest - # conflation: False - # strict_normalization: True (whether or not data should be discarded when it can not be normalized) - # merge_strategy: default (used to specify alternative merge strategies) - - source_id: HGNC - - source_id: PANTHER - - - graph_id: Testing_Graph_2 - graph_name: Testing Graph 2 - graph_description: 'Another fake description for testing graph 2!' - graph_url: 'http://localhost/fake_graph_url_for_tesing_graph_2' - output_format: neo4j - subgraphs: - - graph_id: Testing_Baseline - sources: - - source_id: GtoPdb \ No newline at end of file diff --git a/set_up_test_env.sh b/set_up_test_env.sh index 1ef6edd3..e5fc26a2 100644 --- a/set_up_test_env.sh +++ b/set_up_test_env.sh @@ -17,7 +17,7 @@ export ORION_LOGS="$PWD/../ORION_logs/" #Use EITHER of the following, ORION_GRAPH_SPEC or ORION_GRAPH_SPEC_URL #ORION_GRAPH_SPEC - the name of a Graph Spec file located in the graph_specs directory of ORION -export ORION_GRAPH_SPEC=testing-graph-spec.yml +export ORION_GRAPH_SPEC=example-graph-spec.yaml #ORION_GRAPH_SPEC_URL - a URL pointing to a Graph Spec file #export ORION_GRAPH_SPEC_URL=https://raw.githubusercontent.com/RENCI-AUTOMAT/ORION/helm_deploy/graph_specs/yeast-graph-spec.yml diff --git a/tests/graph_specs/testing-graph-spec.yaml b/tests/graph_specs/testing-graph-spec.yaml new file mode 100644 index 00000000..f619130f --- /dev/null +++ b/tests/graph_specs/testing-graph-spec.yaml @@ -0,0 +1,10 @@ +# Testing graph spec +graphs: + - graph_id: Testing_Graph + graph_name: Testing Graph + graph_description: 'This is a small graph spec that can be used for testing.' + graph_url: '' + output_format: + sources: + - source_id: CTD + - source_id: HGNC \ No newline at end of file diff --git a/tests/test_graph_spec.py b/tests/test_graph_spec.py new file mode 100644 index 00000000..21cb25f0 --- /dev/null +++ b/tests/test_graph_spec.py @@ -0,0 +1,73 @@ +import os +import pytest +import requests.exceptions + +from Common.build_manager import GraphBuilder, GraphSpecError + + +def clear_graph_spec_config(): + os.environ['ORION_GRAPH_SPEC'] = '' + os.environ['ORION_GRAPH_SPEC_URL'] = '' + + +def reset_graph_spec_config(): + os.environ['ORION_GRAPH_SPEC'] = 'testing-graph-spec.yaml' + os.environ['ORION_GRAPH_SPEC_URL'] = '' + + +def get_testing_graph_spec_dir(): + # this is ORION/tests/graph_specs not ORION/graph_specs + testing_specs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'graph_specs') + print(testing_specs_dir) + return testing_specs_dir + + +def test_empty_graph_spec_config(): + clear_graph_spec_config() + with pytest.raises(GraphSpecError): + graph_builder = GraphBuilder(graph_specs_dir=get_testing_graph_spec_dir()) + + +def test_invalid_graph_spec_config(): + clear_graph_spec_config() + os.environ['ORION_GRAPH_SPEC'] = 'invalid-spec.yaml' + with pytest.raises(GraphSpecError): + graph_builder = GraphBuilder(graph_specs_dir=get_testing_graph_spec_dir()) + + +def test_invalid_graph_spec_url_config(): + clear_graph_spec_config() + os.environ['ORION_GRAPH_SPEC_URL'] = 'http://localhost/invalid_graph_spec_url' + with pytest.raises(requests.exceptions.ConnectionError): + graph_builder = GraphBuilder() + + +def test_valid_graph_spec_config(): + reset_graph_spec_config() + os.environ['ORION_GRAPH_SPEC'] = 'testing-graph-spec.yaml' + graph_builder = GraphBuilder(graph_specs_dir=get_testing_graph_spec_dir()) + assert len(graph_builder.graph_specs) + + testing_graph_spec = graph_builder.graph_specs.get('Testing_Graph', None) + assert testing_graph_spec is not None + + assert len(testing_graph_spec.sources) == 2 + + for source in testing_graph_spec.sources: + assert source.version is None + + +def test_graph_spec_lazy_versions(): + reset_graph_spec_config() + os.environ['ORION_GRAPH_SPEC'] = 'testing-graph-spec.yaml' + graph_builder = GraphBuilder(graph_specs_dir=get_testing_graph_spec_dir()) + testing_graph_spec = graph_builder.graph_specs.get('Testing_Graph', None) + for source in testing_graph_spec.sources: + assert source.version is None + for source in testing_graph_spec.sources: + source.source_version = source.id + "_1" + for source in testing_graph_spec.sources: + assert source.version is not None + + +