From 5ba4ad953c4f5e58e796657a57bcd4844d04a117 Mon Sep 17 00:00:00 2001 From: Diego Lovison Date: Thu, 19 Oct 2023 08:33:37 -0300 Subject: [PATCH 1/2] Use project param in get_secret to retrieve minio secret --- ods_ci/libs/DataSciencePipelinesKfpTekton.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ods_ci/libs/DataSciencePipelinesKfpTekton.py b/ods_ci/libs/DataSciencePipelinesKfpTekton.py index b68b2a626..6f383578a 100644 --- a/ods_ci/libs/DataSciencePipelinesKfpTekton.py +++ b/ods_ci/libs/DataSciencePipelinesKfpTekton.py @@ -79,7 +79,7 @@ def kfp_tekton_create_run_from_pipeline_func( self, user, pwd, project, route_name, source_code, fn, current_path=None ): client, api = self.get_client(user, pwd, project, route_name) - mlpipeline_minio_artifact_secret = self.get_secret(api, 'pipelineskfptekton1', 'mlpipeline-minio-artifact') + mlpipeline_minio_artifact_secret = self.get_secret(api, project, 'mlpipeline-minio-artifact') # the current path is from where you are running the script # sh ods_ci/run_robot_test.sh # the current_path will be ods-ci @@ -110,3 +110,7 @@ def kfp_tekton_wait_for_run_completion( _, api = self.get_client(user, pwd, project, route_name) return api.check_run_status(run_result.run_id, timeout=timeout) + +if __name__ == "__main__": + example = DataSciencePipelinesKfpTekton() + example.kfp_tekton_create_run_from_pipeline_func('ldap-admin20', 'rhodsPW#1', 'p2', 'ds-pipeline-pipelines-definition', 'upload_download.py', 'wire_up_pipeline', '/home/dlovison/github/rhods/ods-ci') From 4394e13fb6ceffcc505101557d113f1e51a8fe8e Mon Sep 17 00:00:00 2001 From: Diego Lovison Date: Thu, 19 Oct 2023 08:33:54 -0300 Subject: [PATCH 2/2] Compiled upload download yaml for testing purpose --- ods_ci/libs/DataSciencePipelinesKfpTekton.py | 4 - .../upload_download-compiled.yaml | 369 ++++++++++++++++++ 2 files changed, 369 insertions(+), 4 deletions(-) create mode 100644 ods_ci/tests/Resources/Files/pipeline-samples/upload_download-compiled.yaml diff --git a/ods_ci/libs/DataSciencePipelinesKfpTekton.py b/ods_ci/libs/DataSciencePipelinesKfpTekton.py index 6f383578a..706f98c7b 100644 --- a/ods_ci/libs/DataSciencePipelinesKfpTekton.py +++ b/ods_ci/libs/DataSciencePipelinesKfpTekton.py @@ -110,7 +110,3 @@ def kfp_tekton_wait_for_run_completion( _, api = self.get_client(user, pwd, project, route_name) return api.check_run_status(run_result.run_id, timeout=timeout) - -if __name__ == "__main__": - example = DataSciencePipelinesKfpTekton() - example.kfp_tekton_create_run_from_pipeline_func('ldap-admin20', 'rhodsPW#1', 'p2', 'ds-pipeline-pipelines-definition', 'upload_download.py', 'wire_up_pipeline', '/home/dlovison/github/rhods/ods-ci') diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/upload_download-compiled.yaml b/ods_ci/tests/Resources/Files/pipeline-samples/upload_download-compiled.yaml new file mode 100644 index 000000000..0775f7567 --- /dev/null +++ b/ods_ci/tests/Resources/Files/pipeline-samples/upload_download-compiled.yaml @@ -0,0 +1,369 @@ +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: test-data-passing-pipeline-1 + annotations: + tekton.dev/output_artifacts: '{"receive-file": [{"key": "artifacts/$PIPELINERUN/receive-file/saveartifact.tgz", + "name": "receive-file-saveartifact", "path": "/tmp/outputs/saveartifact/data"}], + "send-file": [{"key": "artifacts/$PIPELINERUN/send-file/outgoingfile.tgz", "name": + "send-file-outgoingfile", "path": "/tmp/outputs/outgoingfile/data"}]}' + tekton.dev/input_artifacts: '{"receive-file": [{"name": "send-file-outgoingfile", + "parent_task": "send-file"}], "test-uploaded-artifact": [{"name": "receive-file-saveartifact", + "parent_task": "receive-file"}]}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"receive-file": [["saveartifact", "$(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/saveartifact"]], + "send-file": [["outgoingfile", "$(workspaces.send-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/outgoingfile"]], + "test-uploaded-artifact": []}' + sidecar.istio.io/inject: "false" + tekton.dev/template: '' + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"name": "mlpipeline_minio_artifact_secret"}], + "name": "Test Data Passing Pipeline 1"}' + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' +spec: + params: + - name: mlpipeline_minio_artifact_secret + value: '' + pipelineSpec: + params: + - name: mlpipeline_minio_artifact_secret + tasks: + - name: send-file + taskSpec: + steps: + - name: main + args: + - --file-size-bytes + - '20971520' + - --outgoingfile + - $(workspaces.send-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/outgoingfile + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def send_file( + file_size_bytes, + outgoingfile, + ): + import os + import zipfile + + def create_large_file(file_path, size_in_bytes): + with open(file_path, 'wb') as f: + f.write(os.urandom(size_in_bytes)) + + def zip_file(input_file_path, output_zip_path): + with zipfile.ZipFile(output_zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: + zipf.write(input_file_path, os.path.basename(input_file_path)) + + print("starting creating the file...") + file_path = "/tmp/large_file.txt" + create_large_file(file_path, file_size_bytes) + zip_file(file_path, outgoingfile) + print("done") + + import argparse + _parser = argparse.ArgumentParser(prog='Send file', description='') + _parser.add_argument("--file-size-bytes", dest="file_size_bytes", type=int, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--outgoingfile", dest="outgoingfile", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = send_file(**_parsed_args) + image: registry.access.redhat.com/ubi8/python-38 + env: + - name: ORIG_PR_NAME + valueFrom: + fieldRef: + fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun'] + - image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 + name: output-taskrun-name + command: + - sh + - -ec + - echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)" + - image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 + name: copy-results-artifacts + command: + - sh + - -ec + - | + set -exo pipefail + TOTAL_SIZE=0 + copy_artifact() { + if [ -d "$1" ]; then + tar -czvf "$1".tar.gz "$1" + SUFFIX=".tar.gz" + fi + ARTIFACT_SIZE=`wc -c "$1"${SUFFIX} | awk '{print $1}'` + TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE) + touch "$2" + if [[ $TOTAL_SIZE -lt 3072 ]]; then + if [ -d "$1" ]; then + tar -tzf "$1".tar.gz > "$2" + elif ! awk "/[^[:print:]]/{f=1} END{exit !f}" "$1"; then + cp "$1" "$2" + fi + fi + } + copy_artifact $(workspaces.send-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/outgoingfile $(results.outgoingfile.path) + onError: continue + env: + - name: ORIG_PR_NAME + valueFrom: + fieldRef: + fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun'] + results: + - name: outgoingfile + type: string + description: /tmp/outputs/outgoingfile/data + - name: taskrun-name + type: string + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "Send file", "outputs": + [{"name": "outgoingfile"}], "version": "Send file@sha256=0d4ad7ec54eec13c6449b3a1514b4dcac6f65b7fad58319749cb7ca7b3c33d17"}' + workspaces: + - name: send-file + workspaces: + - name: send-file + workspace: test-data-passing-pipeline-1 + - name: receive-file + params: + - name: send-file-trname + value: $(tasks.send-file.results.taskrun-name) + taskSpec: + steps: + - name: main + args: + - --incomingfile + - $(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(params.send-file-trname)/outgoingfile + - --saveartifact + - $(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/saveartifact + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def receive_file( + incomingfile, + saveartifact, + ): + import os + import shutil + + print("reading %s, size is %s" % (incomingfile, os.path.getsize(incomingfile))) + + with open(incomingfile, "rb") as f: + b = f.read(1) + print("read byte: %s" % b) + f.close() + + print("copying in %s to out %s" % (incomingfile, saveartifact)) + shutil.copyfile(incomingfile, saveartifact) + + import argparse + _parser = argparse.ArgumentParser(prog='Receive file', description='') + _parser.add_argument("--incomingfile", dest="incomingfile", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--saveartifact", dest="saveartifact", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = receive_file(**_parsed_args) + image: registry.access.redhat.com/ubi8/python-38 + env: + - name: ORIG_PR_NAME + valueFrom: + fieldRef: + fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun'] + - image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 + name: output-taskrun-name + command: + - sh + - -ec + - echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)" + - image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 + name: copy-results-artifacts + command: + - sh + - -ec + - | + set -exo pipefail + TOTAL_SIZE=0 + copy_artifact() { + if [ -d "$1" ]; then + tar -czvf "$1".tar.gz "$1" + SUFFIX=".tar.gz" + fi + ARTIFACT_SIZE=`wc -c "$1"${SUFFIX} | awk '{print $1}'` + TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE) + touch "$2" + if [[ $TOTAL_SIZE -lt 3072 ]]; then + if [ -d "$1" ]; then + tar -tzf "$1".tar.gz > "$2" + elif ! awk "/[^[:print:]]/{f=1} END{exit !f}" "$1"; then + cp "$1" "$2" + fi + fi + } + copy_artifact $(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/saveartifact $(results.saveartifact.path) + onError: continue + env: + - name: ORIG_PR_NAME + valueFrom: + fieldRef: + fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun'] + params: + - name: send-file-trname + results: + - name: saveartifact + type: string + description: /tmp/outputs/saveartifact/data + - name: taskrun-name + type: string + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + artifact_outputs: '["saveartifact"]' + pipelines.kubeflow.org/component_spec_digest: '{"name": "Receive file", + "outputs": [{"name": "saveartifact"}], "version": "Receive file@sha256=0e5a4e789616b34d37f1dd67b68b94f4c200f29e383d531b486d16cb0e02a2ac"}' + workspaces: + - name: receive-file + workspaces: + - name: receive-file + workspace: test-data-passing-pipeline-1 + runAfter: + - send-file + - name: test-uploaded-artifact + params: + - name: mlpipeline_minio_artifact_secret + value: $(params.mlpipeline_minio_artifact_secret) + - name: receive-file-trname + value: $(tasks.receive-file.results.taskrun-name) + taskSpec: + steps: + - name: main + args: + - --previous-step + - $(workspaces.test-uploaded-artifact.path)/artifacts/$ORIG_PR_NAME/$(params.receive-file-trname)/saveartifact + - --file-size-bytes + - '20971520' + - --mlpipeline-minio-artifact-secret + - $(inputs.params.mlpipeline_minio_artifact_secret) + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'minio' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet + --no-warn-script-location 'minio' --user) && "$0" "$@" + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def test_uploaded_artifact(previous_step, file_size_bytes, mlpipeline_minio_artifact_secret): + from minio import Minio + import base64 + import json + + print(previous_step) + name_data = previous_step.split('/') + object_name = 'artifacts/' + name_data[4] + '/receive-file/saveartifact.tgz' + + mlpipeline_minio_artifact_secret = json.loads(mlpipeline_minio_artifact_secret)["data"] + + def inner_decode(my_str): + return base64.b64decode(my_str).decode("utf-8") + + host = inner_decode(mlpipeline_minio_artifact_secret["host"]) + port = inner_decode(mlpipeline_minio_artifact_secret["port"]) + access_key = inner_decode(mlpipeline_minio_artifact_secret["accesskey"]) + secret_key = inner_decode(mlpipeline_minio_artifact_secret["secretkey"]) + secure = inner_decode(mlpipeline_minio_artifact_secret["secure"]) + secure = secure.lower() == 'true' + print(host, port, access_key, secret_key, secure) + client = Minio( + f'{host}:{port}', + access_key=access_key, + secret_key=secret_key, + secure=secure + ) + + data = client.get_object('mlpipeline', object_name) + with open('my-testfile', 'wb') as file_data: + for d in data.stream(32 * 1024): + file_data.write(d) + bytes_written = file_data.tell() + + print(file_size_bytes, bytes_written) + diff = round((bytes_written / file_size_bytes) - 1, 3) + print(diff) + # if not matching, the test will fail + assert diff == 0 + + import argparse + _parser = argparse.ArgumentParser(prog='Test uploaded artifact', description='') + _parser.add_argument("--previous-step", dest="previous_step", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--file-size-bytes", dest="file_size_bytes", type=int, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--mlpipeline-minio-artifact-secret", dest="mlpipeline_minio_artifact_secret", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = test_uploaded_artifact(**_parsed_args) + image: registry.access.redhat.com/ubi8/python-38 + env: + - name: ORIG_PR_NAME + valueFrom: + fieldRef: + fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun'] + params: + - name: mlpipeline_minio_artifact_secret + - name: receive-file-trname + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "Test uploaded + artifact", "outputs": [], "version": "Test uploaded artifact@sha256=705c9f7ea80f9fddd0648dab1af3a365f661d7c55f89bb03a370ec7a6093180a"}' + workspaces: + - name: test-uploaded-artifact + workspaces: + - name: test-uploaded-artifact + workspace: test-data-passing-pipeline-1 + runAfter: + - receive-file + workspaces: + - name: test-data-passing-pipeline-1 + workspaces: + - name: test-data-passing-pipeline-1 + volumeClaimTemplate: + spec: + storageClassName: standard-csi + accessModes: + - ReadWriteMany + resources: + requests: + storage: 2Gi