Skip to content

Commit

Permalink
big refactor to improve graph spec handling and versioning
Browse files Browse the repository at this point in the history
- implements the intent of Dan Korn's PR to lazy load source versions only when needed
- improves data source and subgraph version handling
- adds ability to change local graph specs directory
- improved example graph spec and docs
- general clean up and refactoring of build_manager
  • Loading branch information
EvanDietzMorris committed Jan 14, 2025
1 parent 2f9ca16 commit 3a51dc8
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 371 deletions.
453 changes: 227 additions & 226 deletions Common/build_manager.py

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions Common/kgx_file_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def merge(self,
primary_sources = []
secondary_sources = []
for graph_source in chain(graph_spec.sources, graph_spec.subgraphs):
if graph_source.merge_strategy == 'default':
if not graph_source.merge_strategy:
primary_sources.append(graph_source)
elif graph_source.merge_strategy == 'connected_edge_subset':
secondary_sources.append(graph_source)
else:
return {'merge_error': f'Unsupported merge strategy specified: {graph_source.merge_strategy}'}

# TODO we should be able to process a single primary source more efficiently (ie copy and paste it)
# if len(primary_sources) == 1:
Expand All @@ -73,8 +75,9 @@ def merge(self,
all_source_ids = [graph_source.id for graph_source in chain(graph_spec.sources, graph_spec.subgraphs)]
missing_data_sets = [source_id for source_id in all_source_ids if
source_id not in merge_metadata['sources'].keys()]
self.logger.error(f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}")

error_message = f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}"
self.logger.error(error_message)
merge_metadata["merge_error"] = error_message
return merge_metadata

def merge_primary_sources(self,
Expand Down
71 changes: 34 additions & 37 deletions Common/kgxmodel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dataclasses import dataclass, InitVar
from typing import Callable
from dataclasses import dataclass
from Common.biolink_constants import NAMED_THING
from Common.metadata import GraphMetadata
from Common.metadata import GraphMetadata, get_source_release_version
from Common.normalization import NormalizationScheme


Expand Down Expand Up @@ -62,67 +61,65 @@ def get_metadata_representation(self):
@dataclass
class GraphSource:
id: str
version: str = None
merge_strategy: str = 'default'
merge_strategy: str = None
file_paths: list = None

# Version may be generated when requested and differs for subclasses of GraphSource.
def __getattribute__(self, name):
if name == "version":
return self.generate_version()
else:
return object.__getattribute__(self, name)


@dataclass
class SubGraphSource(GraphSource):
graph_version: str = None
graph_metadata: GraphMetadata = None

def get_metadata_representation(self):
return {'graph_id': self.id,
'release_version': self.version,
'graph_version': self.graph_version,
'merge_strategy:': self.merge_strategy,
'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None}

def generate_version(self):
return self.graph_version


@dataclass
class DataSource(GraphSource):
normalization_scheme: NormalizationScheme = None
source_version: InitVar[str] = None
source_version: str = None
parsing_version: str = None
supplementation_version: str = None
release_info: dict = None

# This function serves as an optional way to provide a callable function which can determine the source version,
# instead of setting it during initialization. This is used like lazy initialization, because determining the
# source version of a data source can be expensive and error-prone, and we don't want to do it if we don't need to.
get_source_version: InitVar[Callable[[str], str]] = None
_source_version: str = None
_get_source_version: Callable[[str], str] = None

def __post_init__(self, source_version, get_source_version):
self._get_source_version = get_source_version
# if a source_version is provided in initialization, just store that and return it
if source_version:
self._source_version = source_version
# if neither the source version nor a function to determine it is provided, throw an error
if not source_version and not get_source_version:
raise Exception(f'Invalid DataSource initialization - '
f'source_version or get_source_version must be provided.')

# when the source_version attribute is accessed either return _source_version if it is set
# or call the function supplied to retrieve it
def __getattribute__(self, name):
if name == "source_version":
if self._source_version is None:
self._source_version = self._get_source_version(self.id)
return self._source_version
else:
return object.__getattribute__(self, name)

def get_metadata_representation(self):
metadata = {'source_id': self.id,
'source_version': self.source_version, # this may produce an IDE warning but it's right
'release_version': self.version,
'source_version': self.source_version,
'parsing_version': self.parsing_version,
'supplementation_version': self.supplementation_version,
'normalization_scheme': self.normalization_scheme.get_metadata_representation(),
'release_version': self.generate_version(),
'merge_strategy': self.merge_strategy}
if self.release_info:
metadata.update(self.release_info)
return metadata


# We can use generate_version to see if a source_version was already set. If not, we don't try to generate an
# overall version because we can't. Typical usage would be a lazy instantiation approach, first setting
# source_version to None, then checking this and retrieving/setting the source_version if needed,
# after which the overall version can be generated.
#
# We use get_source_release_version to generate versions for data sources the same deterministic way that
# the data source pipeline uses, so a version generated by a graph spec will match the release version generated by
# previous runs of the pipeline.
def generate_version(self):
if self.source_version is None:
return None
return get_source_release_version(self.id,
self.source_version,
self.parsing_version,
self.normalization_scheme.get_composite_normalization_version(),
self.supplementation_version)
18 changes: 8 additions & 10 deletions Common/load_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,20 +504,18 @@ def run_qc_and_metadata_stage(self,
parsing_version: str,
supplementation_version: str,
normalization_scheme: NormalizationScheme):
# source data QC here
source_metadata = self.get_source_metadata(source_id, source_version)
normalization_version = normalization_scheme.get_composite_normalization_version()
# source data QC should go here

self.logger.info(f'Generating release for {source_id}')
source_metadata = self.get_source_metadata(source_id, source_version)
loader = SOURCE_DATA_LOADER_CLASSES[source_id](test_mode=self.test_mode)
source_meta_information = loader.get_source_meta_information()
source_metadata.generate_release_metadata(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version,
source_meta_information=source_meta_information)
return source_metadata.get_release_version(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version)
normalization_version = normalization_scheme.get_composite_normalization_version()
release_version = source_metadata.generate_release_metadata(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version,
source_meta_information=source_meta_information)
return release_version

def get_source_metadata(self, source_id: str, source_version):
if source_id not in self.source_metadata or source_version not in self.source_metadata[source_id]:
Expand Down
58 changes: 18 additions & 40 deletions Common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,31 +297,18 @@ def has_supplemental_data(self, parsing_version: str, normalization_version: str
except KeyError:
return False

def get_release_version(self,
parsing_version: str,
normalization_version: str,
supplementation_version: str):
if "releases" in self.metadata:
for release_version, release in self.metadata["releases"].items():
if ((release["parsing_version"] == parsing_version) and
(release["normalization_version"] == normalization_version) and
(release["supplementation_version"] == supplementation_version)):
return release_version
return None

def generate_release_metadata(self,
parsing_version: str,
normalization_version: str,
supplementation_version: str,
source_meta_information: dict):
if "releases" not in self.metadata:
self.metadata["releases"] = {}
release_info = "".join([self.source_id,
self.source_version,
parsing_version,
normalization_version,
supplementation_version])
release_version = xxh64_hexdigest(release_info)
release_version = get_source_release_version(self.source_id,
self.source_version,
parsing_version,
normalization_version,
supplementation_version)
if release_version not in self.metadata["releases"]:
self.metadata["releases"][release_version] = {
"source_version": self.source_version,
Expand All @@ -331,31 +318,22 @@ def generate_release_metadata(self,
}
self.metadata["releases"][release_version].update(source_meta_information)
self.save_metadata()
return release_version

def get_release_info(self, release_version: str):
if 'releases' in self.metadata and release_version in self.metadata['releases']:
return self.metadata['releases'][release_version]
return None

'''
these need to be updated for the new versioning format, but we may not need them
def get_final_node_count(self):
try:
node_count = 0
node_count += self.metadata['normalization_info']['final_normalized_nodes']
if self.has_supplemental_data():
node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_nodes']
return node_count
except KeyError as k:
raise RuntimeError(f'Required metadata was not available: {k}')
def get_final_edge_count(self):
try:
node_count = 0
node_count += self.metadata['normalization_info']['final_normalized_edges']
if self.has_supplemental_data():
node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_edges']
return node_count
except KeyError as k:
raise RuntimeError(f'Required metadata was not available: {k}')
'''

def get_source_release_version(source_id,
source_version,
parsing_version,
normalization_version,
supplementation_version):
release_string = "_".join([source_id,
source_version,
parsing_version,
normalization_version,
supplementation_version])
return xxh64_hexdigest(release_string)
88 changes: 64 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To build a graph use a Graph Spec yaml file to specify the sources you want.

ORION will automatically run each data source specified through the necessary pipeline. Then it will merge the specified sources into a Knowledge Graph.

### Using ORION
### Installing and Configuring ORION

Create a parent directory:
```
Expand All @@ -30,21 +30,21 @@ git clone https://github.com/RobokopU24/ORION.git

Next create directories where data sources, graphs, and logs will be stored.

ORION_STORAGE - for storing data sources
**ORION_STORAGE** - for storing data sources

ORION_GRAPHS - for storing knowledge graphs
**ORION_GRAPHS** - for storing knowledge graphs

ORION_LOGS - for storing logs
**ORION_LOGS** - for storing logs

You can do this manually, or use the script indicated below to set up a default configuration.
You can do this manually, or use the script indicated below to set up a default workspace.

Option 1: Use this script to create the directories and set the environment variables:
```
cd ~/ORION_root/ORION/
source ./set_up_test_env.sh
```

Option 2: Create three directories and manually set environment variables specifying paths to the locations of those directories.
Option 2: Create three directories and set environment variables specifying paths to the locations of those directories.
```
mkdir ~/ORION_root/storage/
export ORION_STORAGE=~/ORION_root/storage/
Expand All @@ -56,52 +56,92 @@ mkdir ~/ORION_root/logs/
export ORION_LOGS=~/ORION_root/logs/
```

Next create or select a Graph Spec yaml file where the content of knowledge graphs to be built will be specified.
Next create or select a Graph Spec yaml file, where the content of knowledge graphs to be built is specified.

Use either of the following options, but not both:

Note that running the setup script set_up_test_env.sh will perform Option 1 for you.
Set either of the following environment variables, but not both:

Option 1: ORION_GRAPH_SPEC - the name of a Graph Spec file located in the graph_specs directory of ORION
```
export ORION_GRAPH_SPEC=testing-graph-spec.yml
export ORION_GRAPH_SPEC=example-graph-spec.yaml
```
Option 2: ORION_GRAPH_SPEC_URL - a URL pointing to a Graph Spec yaml file
```
export ORION_GRAPH_SPEC_URL=https://stars.renci.org/var/data_services/graph_specs/default-graph-spec.yaml
```
Option 2: ORION_GRAPH_SPEC_URL - a URL pointing to a Graph Spec file

To build a custom graph, alter a Graph Spec file, which is composed of a list of graphs.

For each graph, specify:

**graph_id** - a unique identifier string for the graph, with no spaces

**sources** - a list of sources identifiers for data sources to include in the graph

See the full list of data sources and their identifiers in the [data sources file](https://github.com/RobokopU24/ORION/blob/master/Common/data_sources.py).

Here is a simple example.
```
export ORION_GRAPH_SPEC_URL=https://example.com/example-graph-spec.yml
graphs:
- graph_id: Example_Graph
graph_name: Example Graph
graph_description: A free text description of what is in the graph.
output_format: neo4j
sources:
- source_id: CTD
- source_id: HGNC
```

To build a custom graph, alter the Graph Spec file. See the graph_specs directory for examples.
There are variety of ways to further customize a knowledge graph. The following are parameters you can set for a particular data source. Mostly, these parameters are used to indicate that you'd like to use a previously built version of a data source or a specific normalization of a source. If you specify versions that are not the latest, and haven't previously built a data source or graph with those versions, it probably won't work.

**source_version** - the version of the data source, as determined by ORION

**parsing_version** - the version of the parsing code in ORION for this source

**merge_strategy** - used to specify alternative merge strategies

The following are parameters you can set for the entire graph, or for an individual data source:

TODO: explain options available in the graph spec (normalization version, source data version can be specified)
**node_normalization_version** - the version of the node normalizer API (see: https://nodenormalization-sri.renci.org/openapi.json)

**edge_normalization_version** - the version of biolink model used to normalize predicates and validate the KG

**strict_normalization** - True or False specifying whether to discard nodes, node types, and edges connected to those nodes when they fail to normalize

**conflation** - True or False flag specifying whether to conflate genes with proteins and chemicals with drugs

For example, we could customize the previous example:
```
graphs:
- graph_id: Example_Graph_ID
- graph_id: Example_Graph
graph_name: Example Graph
graph_description: This is a description of what is in the graph.
graph_description: A free text description of what is in the graph.
output_format: neo4j
sources:
- source_id: Biolink
- source_id: CTD
- source_id: HGNC
```

See the graph_specs directory for more examples.

### Running ORION

Install Docker to create and run the necessary containers.

By default using docker-compose up will build every graph in your Graph Spec. It runs the command: python /ORION/Common/build_manager.py all.
By default, using docker-compose up will build every graph in your Graph Spec. It runs the command: python /ORION/Common/build_manager.py all
```
docker-compose up
```
If you want to specify an individual graph you can override the default command with a graph id from your Spec.
If you want to build an individual graph, you can override the default command with a graph_id from the Graph Spec:
```
docker-compose run --rm orion python /ORION/Common/build_manager.py Example_Graph_ID
docker-compose run --rm orion python /ORION/Common/build_manager.py Example_Graph
```
To run the ORION pipeline for a single data source, you can use:
To run the ORION pipeline for a single data source, you can use the load manager:
```
docker-compose run --rm orion python /ORION/Common/load_manager.py Example_Source
docker-compose run --rm orion python /ORION/Common/load_manager.py CTD
```
To see available arguments and a list of supported data sources:
```
python /ORION/Common/load_manager.py -h
docker-compose run --rm orion python /ORION/Common/load_manager.py -h
```

### For Developers
Expand Down
Loading

0 comments on commit 3a51dc8

Please sign in to comment.