Skip to content

Commit

Permalink
Merge pull request #982 from diegolovison/compiled_upload
Browse files Browse the repository at this point in the history
Compiled upload download yaml for testing purpose
  • Loading branch information
diegolovison authored Oct 23, 2023
2 parents 98a4185 + 4394e13 commit 8106e58
Show file tree
Hide file tree
Showing 2 changed files with 370 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ods_ci/libs/DataSciencePipelinesKfpTekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def kfp_tekton_create_run_from_pipeline_func(
self, user, pwd, project, route_name, source_code, fn, current_path=None
):
client, api = self.get_client(user, pwd, project, route_name)
mlpipeline_minio_artifact_secret = self.get_secret(api, 'pipelineskfptekton1', 'mlpipeline-minio-artifact')
mlpipeline_minio_artifact_secret = self.get_secret(api, project, 'mlpipeline-minio-artifact')
# the current path is from where you are running the script
# sh ods_ci/run_robot_test.sh
# the current_path will be ods-ci
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: test-data-passing-pipeline-1
annotations:
tekton.dev/output_artifacts: '{"receive-file": [{"key": "artifacts/$PIPELINERUN/receive-file/saveartifact.tgz",
"name": "receive-file-saveartifact", "path": "/tmp/outputs/saveartifact/data"}],
"send-file": [{"key": "artifacts/$PIPELINERUN/send-file/outgoingfile.tgz", "name":
"send-file-outgoingfile", "path": "/tmp/outputs/outgoingfile/data"}]}'
tekton.dev/input_artifacts: '{"receive-file": [{"name": "send-file-outgoingfile",
"parent_task": "send-file"}], "test-uploaded-artifact": [{"name": "receive-file-saveartifact",
"parent_task": "receive-file"}]}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"receive-file": [["saveartifact", "$(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/saveartifact"]],
"send-file": [["outgoingfile", "$(workspaces.send-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/outgoingfile"]],
"test-uploaded-artifact": []}'
sidecar.istio.io/inject: "false"
tekton.dev/template: ''
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"name": "mlpipeline_minio_artifact_secret"}],
"name": "Test Data Passing Pipeline 1"}'
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
spec:
params:
- name: mlpipeline_minio_artifact_secret
value: ''
pipelineSpec:
params:
- name: mlpipeline_minio_artifact_secret
tasks:
- name: send-file
taskSpec:
steps:
- name: main
args:
- --file-size-bytes
- '20971520'
- --outgoingfile
- $(workspaces.send-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/outgoingfile
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def send_file(
file_size_bytes,
outgoingfile,
):
import os
import zipfile
def create_large_file(file_path, size_in_bytes):
with open(file_path, 'wb') as f:
f.write(os.urandom(size_in_bytes))
def zip_file(input_file_path, output_zip_path):
with zipfile.ZipFile(output_zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
zipf.write(input_file_path, os.path.basename(input_file_path))
print("starting creating the file...")
file_path = "/tmp/large_file.txt"
create_large_file(file_path, file_size_bytes)
zip_file(file_path, outgoingfile)
print("done")
import argparse
_parser = argparse.ArgumentParser(prog='Send file', description='')
_parser.add_argument("--file-size-bytes", dest="file_size_bytes", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--outgoingfile", dest="outgoingfile", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = send_file(**_parsed_args)
image: registry.access.redhat.com/ubi8/python-38
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
- image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61
name: output-taskrun-name
command:
- sh
- -ec
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
- image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61
name: copy-results-artifacts
command:
- sh
- -ec
- |
set -exo pipefail
TOTAL_SIZE=0
copy_artifact() {
if [ -d "$1" ]; then
tar -czvf "$1".tar.gz "$1"
SUFFIX=".tar.gz"
fi
ARTIFACT_SIZE=`wc -c "$1"${SUFFIX} | awk '{print $1}'`
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
touch "$2"
if [[ $TOTAL_SIZE -lt 3072 ]]; then
if [ -d "$1" ]; then
tar -tzf "$1".tar.gz > "$2"
elif ! awk "/[^[:print:]]/{f=1} END{exit !f}" "$1"; then
cp "$1" "$2"
fi
fi
}
copy_artifact $(workspaces.send-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/outgoingfile $(results.outgoingfile.path)
onError: continue
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
results:
- name: outgoingfile
type: string
description: /tmp/outputs/outgoingfile/data
- name: taskrun-name
type: string
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "Send file", "outputs":
[{"name": "outgoingfile"}], "version": "Send file@sha256=0d4ad7ec54eec13c6449b3a1514b4dcac6f65b7fad58319749cb7ca7b3c33d17"}'
workspaces:
- name: send-file
workspaces:
- name: send-file
workspace: test-data-passing-pipeline-1
- name: receive-file
params:
- name: send-file-trname
value: $(tasks.send-file.results.taskrun-name)
taskSpec:
steps:
- name: main
args:
- --incomingfile
- $(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(params.send-file-trname)/outgoingfile
- --saveartifact
- $(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/saveartifact
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def receive_file(
incomingfile,
saveartifact,
):
import os
import shutil
print("reading %s, size is %s" % (incomingfile, os.path.getsize(incomingfile)))
with open(incomingfile, "rb") as f:
b = f.read(1)
print("read byte: %s" % b)
f.close()
print("copying in %s to out %s" % (incomingfile, saveartifact))
shutil.copyfile(incomingfile, saveartifact)
import argparse
_parser = argparse.ArgumentParser(prog='Receive file', description='')
_parser.add_argument("--incomingfile", dest="incomingfile", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--saveartifact", dest="saveartifact", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = receive_file(**_parsed_args)
image: registry.access.redhat.com/ubi8/python-38
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
- image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61
name: output-taskrun-name
command:
- sh
- -ec
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
- image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61
name: copy-results-artifacts
command:
- sh
- -ec
- |
set -exo pipefail
TOTAL_SIZE=0
copy_artifact() {
if [ -d "$1" ]; then
tar -czvf "$1".tar.gz "$1"
SUFFIX=".tar.gz"
fi
ARTIFACT_SIZE=`wc -c "$1"${SUFFIX} | awk '{print $1}'`
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
touch "$2"
if [[ $TOTAL_SIZE -lt 3072 ]]; then
if [ -d "$1" ]; then
tar -tzf "$1".tar.gz > "$2"
elif ! awk "/[^[:print:]]/{f=1} END{exit !f}" "$1"; then
cp "$1" "$2"
fi
fi
}
copy_artifact $(workspaces.receive-file.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/saveartifact $(results.saveartifact.path)
onError: continue
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
params:
- name: send-file-trname
results:
- name: saveartifact
type: string
description: /tmp/outputs/saveartifact/data
- name: taskrun-name
type: string
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
artifact_outputs: '["saveartifact"]'
pipelines.kubeflow.org/component_spec_digest: '{"name": "Receive file",
"outputs": [{"name": "saveartifact"}], "version": "Receive file@sha256=0e5a4e789616b34d37f1dd67b68b94f4c200f29e383d531b486d16cb0e02a2ac"}'
workspaces:
- name: receive-file
workspaces:
- name: receive-file
workspace: test-data-passing-pipeline-1
runAfter:
- send-file
- name: test-uploaded-artifact
params:
- name: mlpipeline_minio_artifact_secret
value: $(params.mlpipeline_minio_artifact_secret)
- name: receive-file-trname
value: $(tasks.receive-file.results.taskrun-name)
taskSpec:
steps:
- name: main
args:
- --previous-step
- $(workspaces.test-uploaded-artifact.path)/artifacts/$ORIG_PR_NAME/$(params.receive-file-trname)/saveartifact
- --file-size-bytes
- '20971520'
- --mlpipeline-minio-artifact-secret
- $(inputs.params.mlpipeline_minio_artifact_secret)
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'minio' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'minio' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def test_uploaded_artifact(previous_step, file_size_bytes, mlpipeline_minio_artifact_secret):
from minio import Minio
import base64
import json
print(previous_step)
name_data = previous_step.split('/')
object_name = 'artifacts/' + name_data[4] + '/receive-file/saveartifact.tgz'
mlpipeline_minio_artifact_secret = json.loads(mlpipeline_minio_artifact_secret)["data"]
def inner_decode(my_str):
return base64.b64decode(my_str).decode("utf-8")
host = inner_decode(mlpipeline_minio_artifact_secret["host"])
port = inner_decode(mlpipeline_minio_artifact_secret["port"])
access_key = inner_decode(mlpipeline_minio_artifact_secret["accesskey"])
secret_key = inner_decode(mlpipeline_minio_artifact_secret["secretkey"])
secure = inner_decode(mlpipeline_minio_artifact_secret["secure"])
secure = secure.lower() == 'true'
print(host, port, access_key, secret_key, secure)
client = Minio(
f'{host}:{port}',
access_key=access_key,
secret_key=secret_key,
secure=secure
)
data = client.get_object('mlpipeline', object_name)
with open('my-testfile', 'wb') as file_data:
for d in data.stream(32 * 1024):
file_data.write(d)
bytes_written = file_data.tell()
print(file_size_bytes, bytes_written)
diff = round((bytes_written / file_size_bytes) - 1, 3)
print(diff)
# if not matching, the test will fail
assert diff == 0
import argparse
_parser = argparse.ArgumentParser(prog='Test uploaded artifact', description='')
_parser.add_argument("--previous-step", dest="previous_step", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--file-size-bytes", dest="file_size_bytes", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--mlpipeline-minio-artifact-secret", dest="mlpipeline_minio_artifact_secret", type=str, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = test_uploaded_artifact(**_parsed_args)
image: registry.access.redhat.com/ubi8/python-38
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
params:
- name: mlpipeline_minio_artifact_secret
- name: receive-file-trname
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "Test uploaded
artifact", "outputs": [], "version": "Test uploaded artifact@sha256=705c9f7ea80f9fddd0648dab1af3a365f661d7c55f89bb03a370ec7a6093180a"}'
workspaces:
- name: test-uploaded-artifact
workspaces:
- name: test-uploaded-artifact
workspace: test-data-passing-pipeline-1
runAfter:
- receive-file
workspaces:
- name: test-data-passing-pipeline-1
workspaces:
- name: test-data-passing-pipeline-1
volumeClaimTemplate:
spec:
storageClassName: standard-csi
accessModes:
- ReadWriteMany
resources:
requests:
storage: 2Gi

0 comments on commit 8106e58

Please sign in to comment.