Skip to content

Commit

Permalink
More fixes for 0.3.0 WES
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Amstutz committed Jul 23, 2018
1 parent f85f805 commit 4b1afce
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 46 deletions.
18 changes: 11 additions & 7 deletions wes_client/wes_client_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def main(argv=sys.argv[1:]):
json.dump(response.result(), sys.stdout, indent=4)
return 0

if not args.job_order:
logging.error("Missing job order")
return 1

loader = schema_salad.ref_resolver.Loader({
"location": {"@type": "@id"},
"path": {"@type": "@id"}
Expand All @@ -102,7 +106,7 @@ def fixpaths(d):
visit(input_dict, fixpaths)

workflow_url = args.workflow_url
if not workflow_url.startswith("/") and ":" not in workflow_url:
if ":" not in workflow_url:
workflow_url = "file://" + os.path.abspath(workflow_url)

if args.quiet:
Expand Down Expand Up @@ -131,7 +135,7 @@ def fixpaths(d):
else:
parts.append(("workflow_url", workflow_url))

postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/workflows" % (args.proto, args.host),
postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/runs" % (args.proto, args.host),
files=parts,
headers={"Authorization": args.auth})

Expand All @@ -142,19 +146,19 @@ def fixpaths(d):
exit(1)

if args.wait:
logging.info("Workflow id is %s", r["workflow_id"])
logging.info("Workflow run id is %s", r["run_id"])
else:
sys.stdout.write(r["workflow_id"] + "\n")
sys.stdout.write(r["run_id"] + "\n")
exit(0)

r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result()
r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result()
while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"):
time.sleep(8)
r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result()
r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result()

logging.info("State is %s", r["state"])

s = client.WorkflowExecutionService.GetRunLog(workflow_id=r["workflow_id"]).result()
s = client.WorkflowExecutionService.GetRunLog(run_id=r["run_id"]).result()

try:
# TODO: Only works with Arvados atm
Expand Down
21 changes: 4 additions & 17 deletions wes_service/arvados_wes.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,10 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
workflow_descriptor_file.close()

@catch_exceptions
def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version,
workflow_url, workflow_descriptor, workflow_engine_parameters=None, tags=None):
tempdir = tempfile.mkdtemp()
body = {}
for k, ls in connexion.request.files.iterlists():
for v in ls:
if k == "workflow_descriptor":
filename = secure_filename(v.filename)
v.save(os.path.join(tempdir, filename))
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v.read())
else:
body[k] = v.read()
body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])

if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA
return
def RunWorkflow(self, **args):
tempdir, body = self.collect_attachments()

print(body)

if not connexion.request.headers.get('Authorization'):
raise MissingAuthorization()
Expand Down
23 changes: 2 additions & 21 deletions wes_service/cwl_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,8 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None):
"next_page_token": ""
}

def RunWorkflow(self):
tempdir = tempfile.mkdtemp()
body = {}
for k, ls in connexion.request.files.iterlists():
for v in ls:
if k == "workflow_descriptor":
filename = secure_filename(v.filename)
v.save(os.path.join(tempdir, filename))
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v.read())
else:
body[k] = v.read()

if body['workflow_type'] != "CWL" or \
body['workflow_type_version'] != "v1.0":
return

body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])
index = body["workflow_url"].find("http")
if index > 0:
body["workflow_url"] = body["workflow_url"][index:]
def RunWorkflow(self, **args):
tempdir, body = self.collect_attachments()

run_id = uuid.uuid4().hex
job = Workflow(run_id)
Expand Down
29 changes: 28 additions & 1 deletion wes_service/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from six import itervalues
import tempfile
import json
import os

from six import itervalues
import connexion
from werkzeug.utils import secure_filename

def visit(d, op):
"""Recursively call op(d) for all list subelements and dictionary 'values' that d may have."""
Expand Down Expand Up @@ -35,3 +40,25 @@ def getoptlist(self, p):
if k == p:
optlist.append(v)
return optlist

def collect_attachments(self):
tempdir = tempfile.mkdtemp()
body = {}
for k, ls in connexion.request.files.iterlists():
for v in ls:
if k == "workflow_descriptor":
filename = secure_filename(v.filename)
v.save(os.path.join(tempdir, filename))
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v.read())
else:
body[k] = v.read()

if body['workflow_type'] != "CWL" or \
body['workflow_type_version'] != "v1.0":
return

if ":" not in body["workflow_url"]:
body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"]))

return (tempdir, body)

0 comments on commit 4b1afce

Please sign in to comment.