Skip to content

Commit

Permalink
[SageMaker] [GraphBolt] Add support for launching GraphBolt jobs on S…
Browse files Browse the repository at this point in the history
…ageMaker (#1083)

*Issue #, if available:*


*Description of changes:*

* Add a new SageMaker job to convert DistPart data to GraphBolt. This is
our only option currently as there's no way to directly use S3 as a
writable, shared file system in SageMaker, see
#1081 for details.
* The `sagemaker/launch_graphbolt_convert.py` will launch the SageMaker
job, that downloads the entire partitioned graph to one instance, then
runs the GB conversion, one partition at a time. Because DGL writes the
new fused CSC graph representation in the same directory as the input
data, we can't use one of SageMaker's FastFile modes to stream the data,
as that creates read-only filesystems.
* [Optional] We also include an example of how one could use a SageMaker
Pipeline to run the GSPartition and GBConvert jobs in sequence, but this
can be removed (because SageMaker Pipelines are persistent once
created).
* Added unit test mechanism to test sagemaker scripts, we start with
testing our parsing logic. To make the scripts available to the runner's
python runtime we add the `graphstorm/sagemaker/launch` directory to the
runner's `PYTHONPATH`.

EDIT: One note about the PR: The changes to the partition launch that
use a SageMaker Pipeline are for demonstration purposes, I think I'll
remove them alltogether and just have separate partition/gbconvert jobs.
But we might want to have an example of how to programmatically build an
SM pipeline as an example, e.g. from gsprocessing to training (as SM
jobs)







By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: xiang song(charlie.song) <classicxsong@gmail.com>
  • Loading branch information
thvasilo and classicsong authored Nov 11, 2024
1 parent de31f7b commit 19fac3b
Show file tree
Hide file tree
Showing 18 changed files with 621 additions and 81 deletions.
17 changes: 14 additions & 3 deletions .github/workflow_scripts/pytest_check.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
# Move to parent directory
#!/bin/env bash
# Move to repository root
set -ex

cd ../../

set -ex

FORCE_CUDA=1 python3 -m pip install -e '.[test]' --no-build-isolation
GS_HOME=$(pwd)
# Add SageMaker launch scripts to make the scripts testable
export PYTHONPATH="${PYTHONPATH}:${GS_HOME}/sagemaker/launch/"

python3 -m pip install pytest
FORCE_CUDA=1 python3 -m pip install -e '.[test]' --no-build-isolation

# Run SageMaker tests
python3 -m pytest -x ./tests/sagemaker-tests -s

# Run main library unit tests (Requires multi-gpu instance)
sh ./tests/unit-tests/prepare_test_data.sh
export NCCL_IB_DISABLE=1; export NCCL_SHM_DISABLE=1; NCCL_NET=Socket NCCL_DEBUG=INFO python3 -m pytest -x ./tests/unit-tests -s
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ __pycache__/
*.py[cod]
*$py.class
*.egg-info/
.pytest_cache
.mypy_cache
.ipynb_checkpoints

# used by the container build
/code
Expand All @@ -20,3 +23,8 @@ __pycache__/
docs/build/
docs/source/_build
docs/source/generated

# IDEs and Python venv
.idea
.venv
.vscode
2 changes: 1 addition & 1 deletion docker/push_gsf_container.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -euox pipefail
if [ -b "${1-}" ] && [ "$1" == "--help" ] || [ -b "${1-}" ] && [ "$1" == "-h" ]; then
echo "Usage: docker/push_gsf_container.sh <image-name> <tag> <region> <account>"
echo "Optionally provide the image name, tag, region and account number for the ecr repository"
echo "For example: docker/push_gsf_container.sh graphstorm sm us-west-2 1234567890"
echo "For example: docker/push_gsf_container.sh graphstorm sm-gpu us-west-2 1234567890"
exit 1
fi

Expand Down
19 changes: 10 additions & 9 deletions docker/sagemaker/Dockerfile.sm
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ FROM branch-${DEVICE} AS final

LABEL maintainer="Amazon AI Graph ML team"

# Install related Python packages
# Install required Python packages,
# we set the versions to match those of the base conda environment when possible
RUN pip3 install \
boto3 \
numba==0.58.1 \
numpy==1.26.1 \
boto3==1.34.112 \
numba==0.59.1 \
numpy==1.26.4 \
ogb==1.3.6 \
pyarrow \
scikit-learn \
scipy \
pyarrow==16.1.0 \
scikit-learn==1.5.0 \
scipy==1.13.1 \
transformers==4.28.1 \
&& rm -rf /root/.cache

Expand All @@ -49,12 +50,12 @@ ENV PYTHONPATH="/opt/ml/code/graphstorm/python/:${PYTHONPATH}"
RUN cp /opt/ml/code/graphstorm/sagemaker/run/* /opt/ml/code/

# Download DGL source code
RUN cd /root; git clone --branch v${DGL_VERSION} https://github.com/dmlc/dgl.git
RUN cd /root; git clone --branch v${DGL_VERSION} --single-branch https://github.com/dmlc/dgl.git
# Un-comment if we prefer a local DGL distribution
# COPY dgl /root/dgl
ENV PYTHONPATH="/root/dgl/tools/:${PYTHONPATH}"

WORKDIR /opt/ml/code

ENTRYPOINT ["bash", "-m", "start_with_right_hostname.sh"]
ENTRYPOINT ["bash", "-m", "/usr/local/bin/start_with_right_hostname.sh"]
CMD ["/bin/bash"]
5 changes: 3 additions & 2 deletions docker/sagemaker/build_artifacts/start_with_right_hostname.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ if [[ "$1" = "train" ]]; then
sed -ie "s/PLACEHOLDER_HOSTNAME/$CURRENT_HOST/g" changehostname.c
gcc -o changehostname.o -c -fPIC -Wall changehostname.c
gcc -o libchangehostname.so -shared -export-dynamic changehostname.o -ldl
LD_PRELOAD=/libchangehostname.so train
CWD=$(pwd)
LD_PRELOAD=$CWD/libchangehostname.so train
else
eval "$@"
"$@"
fi
100 changes: 100 additions & 0 deletions docs/source/advanced/using-graphbolt.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,103 @@ data using our GraphBolt conversion entry point:
# We'll see the GraphBolt representation has been re-created
ls /tmp/acm_graphbolt/part0
edge_feat.dgl fused_csc_sampling_graph.pt graph.dgl node_feat.dgl
Using GraphBolt on SageMaker
----------------------------

Before being able to train on SageMaker
we need to ensure our data on S3 have been
converted to the GraphBolt format.
When using GConstruct to process our data
we can include the GraphBolt data conversion in the GConstruct
step as we'll show below.

When using distributed graph construction with GSProcessing and GSPartition,
to prepare data to use with GraphBolt on SageMaker
we need to launch the GraphBolt data conversion step
as a separate SageMaker job, after
the partitioned DGL graph files have been created on S3.

After running your distributed partition SageMaker job as normal using
``sagemaker/launch_partition.py``, you next need to launch the
``sagemaker/launch_graphbolt_convert.py`` script, passing as input
the S3 URI, where the DistDGL partition data is stored by ``launch_partition.py``,
**plus the suffix `dist_graph`** as that's where GSPartition creates the partition files.

For example, if you used ``--output-data-s3 s3://my-bucket/my-part-graph`` for
``sagemaker/launch_partition.py`` you need to use ``--graph-data-s3 s3://my-bucket/my-part-graph/dist_graph``
for ``sagemaker/launch_graphbolt_convert.py``.

Without using GraphBolt a SageMaker job sequence for distributed processing and training
is ``GSProcessing -> GSPartition -> GSTraining``. To use GraphBolt we need to add
a step after partitioning and before training:
``GSProcessing -> GSPartition -> GraphBoltConvert -> GSTraining``.

.. code-block:: bash
cd graphstorm/sagemaker
sagemaker/launch_partition.py \
--graph-data-s3 "s3-uri-where-gsprocessing-data-exist" \
--output-data-s3 "s3-uri-where-gspartition-data-will-be"
# Add other required parameters like --partition-algorithm, --num-instances etc.
# Once the above job succeeds we run the following command to convert the data to GraphBolt format.
# Note the /dist_graph suffix!
sagemaker/launch_graphbolt_convert.py \
--graph-data-s3 "s3-uri-where-gspartition-data-will-be/dist_graph" \
--metadata-filename "metadata.json" # Or <graph-name>.json for gconstruct-ed partitions
If your data are small enough to process on a single SageMaker instance
using ``GConstuct``, you can simply pass the ``--use-graphbolt true`` argument
to the ``GConstruct`` SageMaker launch script and that will create the
necessary GraphBolt files as well.
So the job sequence there remains ``GConstruct -> GSTraining``.

.. code-block:: bash
sagemaker/launch_gconstruct.py \
--graph-data-s3 "s3-uri-where-raw-data-exist" \
--output-data-s3 "s3-uri-where-gspartition-data-will-be" \
--graph-config-file "gconstruct-config.json" \
--use-graphbolt true
If you initially used GConstruct to create the non-GraphBolt DistDGL files,
you'll need to pass in the additional argument ``--metadata-filename``
to ``launch_graphbolt_convert.py``.
Use ``<graph-name>.json`` where the graph name should be the
one you used with GConstruct as shown below:

.. code-block:: bash
# NOTE: we provide 'my-graph' as the graph name
sagemaker/launch_gconstruct.py \
--graph-name my-graph \
--graph-data-s3 "s3-uri-where-raw-data-exist" \
--output-data-s3 "s3-uri-where-gspartition-data-will-be" \
--graph-config-file "gconstruct-config.json" # We don't add --use-graphbolt true
# Once the above job succeeds we run the below to convert the data to GraphBolt
# NOTE: Our metadata file name will be named 'my-graph.json'
sagemaker/launch_graphbolt_convert.py \
--graph-data-s3 "s3-uri-where-gspartition-data-will-be"
--metadata-filename "my-graph.json" # Should be <graph-name>.json
Once the data have been converted to the GraphBolt format you can run your training
and inference jobs as before, passing the additional
argument ``--use-graphbolt`` to the SageMaker launch scripts
to indicate that we want to use GraphBolt during training/inference:

.. code-block:: bash
sagemaker/launch_train.py \
--graph-name my-graph \
--graph-data-s3 "s3-uri-where-gspartition-data-will-be" \
--yaml-s3 "s3-path-to-train-yaml" \
--use-graphbolt true
If you want to test steps locally you can use SageMaker's
[local mode](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode)
by providing `local` as the instance type in the launch scripts.
82 changes: 63 additions & 19 deletions python/graphstorm/gpartition/convert_to_graphbolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,72 @@

def parse_gbconv_args() -> argparse.Namespace:
"""Parses GraphBolt conversion arguments"""
parser = argparse.ArgumentParser("Convert partitioned DGL graph to GraphBolt format.")
parser.add_argument("--metadata-filepath", type=str, required=True,
help="File path to the partitioned DGL metadata file.")
parser.add_argument("--logging-level", type=str, default="info",
help="The logging level. The possible values: debug, info, warning, \
error. The default value is info.")
parser = argparse.ArgumentParser(
"Convert partitioned DGL graph to GraphBolt format."
)
parser.add_argument(
"--metadata-filepath",
type=str,
required=True,
help="File path to the partitioned DGL metadata file.",
)
parser.add_argument(
"--logging-level",
type=str,
default="info",
help="The logging level. The possible values: debug, info, warning, "
"error. The default value is info.",
)
parser.add_argument(
"--njobs",
type=int,
default=1,
help="Number of parallel processes to use for GraphBolt partition conversion. "
"Only applies for DGL >= v2.4.0.",
)

return parser.parse_args()


def main():
""" Entry point
def run_gb_conversion(part_config: str, njobs=1):
"""Converts the DistGraph data under the given part_config to GraphBolt
Parameters
----------
part_config : str
File path to the partitioned data metadata JSON file
njobs : int, optional
Number of partitions to convert in parallel, by default 1.
Only applies if DGL >= 2.4.0.
"""
dgl_version = importlib.metadata.version('dgl')
dgl_version = importlib.metadata.version("dgl")
if version.parse(dgl_version) < version.parse("2.4.0"):
if njobs > 1:
logging.warning(
"GB conversion njobs > 1 is only supported for DGL >= 2.4.0. "
"njobs will be set to 1."
)
dgl_distributed.dgl_partition_to_graphbolt(
part_config,
store_eids=True,
graph_formats="coo",
)
else:
dgl_distributed.dgl_partition_to_graphbolt( # pylint: disable=unexpected-keyword-arg
part_config,
store_eids=True,
graph_formats="coo",
njobs=njobs,
)


def main():
"""Entry point"""
dgl_version = importlib.metadata.version("dgl")
if version.parse(dgl_version) < version.parse("2.1.0"):
raise ValueError(
"GraphBolt conversion requires DGL version >= 2.1.0, "
f"but DGL version was {dgl_version}. "
"GraphBolt conversion requires DGL version >= 2.1.0, "
f"but DGL version was {dgl_version}. "
)

gb_conv_args = parse_gbconv_args()
Expand All @@ -59,14 +107,10 @@ def main():
gb_start = time.time()
logging.info("Converting partitions to GraphBolt format")

dgl_distributed.dgl_partition_to_graphbolt(
part_config,
store_eids=True,
graph_formats="coo",
)
run_gb_conversion(part_config, njobs=gb_conv_args.njobs)

logging.info("GraphBolt conversion took %f sec.", time.time() - gb_start)

logging.info("GraphBolt conversion took %f sec.",
time.time() - gb_start)

if __name__ == '__main__':
if __name__ == "__main__":
main()
14 changes: 7 additions & 7 deletions python/graphstorm/sagemaker/sagemaker_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import time
import subprocess
from threading import Thread, Event
from typing import List
from typing import Any, List

import boto3
import botocore
Expand Down Expand Up @@ -224,15 +224,15 @@ def run_partition(job_config: PartitionJobConfig):
metadata_filename = job_config.metadata_filename
skip_partitioning = job_config.skip_partitioning == 'true'

# Get env from either processing job or training job
# Get resource env from either processing job or training job
try:
with open("/opt/ml/config/resourceconfig.json", "r", encoding="utf-8") as f:
sm_env = json.load(f)
sm_resource_env: dict[str, Any] = json.load(f)
except FileNotFoundError:
sm_env = json.loads(os.environ['SM_TRAINING_ENV'])
sm_resource_env = json.loads(os.environ['SM_TRAINING_ENV'])

hosts = sm_env['hosts']
current_host = sm_env['current_host']
hosts: list[str] = sm_resource_env['hosts']
current_host: str = sm_resource_env['current_host']
world_size = len(hosts)
os.environ['WORLD_SIZE'] = str(world_size)
host_rank = hosts.index(current_host)
Expand All @@ -255,7 +255,7 @@ def run_partition(job_config: PartitionJobConfig):
for key, val in os.environ.items():
logging.debug("%s: %s", key, val)

leader_addr = socket.gethostbyname('algo-1')
leader_addr = socket.gethostbyname(sorted(hosts)[0])
# sync with all instances in the cluster
if host_rank == 0:
# sync with workers
Expand Down
3 changes: 3 additions & 0 deletions python/graphstorm/sagemaker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ def download_graph(graph_data_s3, graph_name, part_id, world_size,
# Something else has gone wrong.
raise err

assert graph_config, \
(f"Could not find a graph config file named {graph_name}.json or metadata.json "
f"under {graph_data_s3}")
S3Downloader.download(os.path.join(graph_data_s3, graph_config),
graph_path, sagemaker_session=sagemaker_session)
try:
Expand Down
Loading

0 comments on commit 19fac3b

Please sign in to comment.