Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrating tasks #8

Open
avico78 opened this issue Apr 30, 2022 · 2 comments
Open

Orchestrating tasks #8

avico78 opened this issue Apr 30, 2022 · 2 comments

Comments

@avico78
Copy link

avico78 commented Apr 30, 2022

first, GREAT(!) project and I believe it should deserve much more attention .

I pass through the documentation and the example but still have questions as i couldnt make it work as i expect,

so first for triggering a tasks flow without schedule it (by request)
i.e lets say i have basic flow :

             task2
          / 
task1 
         \
           taske 3  - task4

task2/task4 depends on task1
task3 depends on task3 and task2

worker:

server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        server_secret='abcd1234',
        manager_host='0.0.0.0',
        manager_port=8222,
        manager_secret='abcd1234',
        jobs_queue='ETL',
        max_tasks_per_worker=5
    )
    
    @worker.task()
    async def task1 (run_before=['task2', 'task3']):
        print(f"task1  - starting")
        await asyncio.sleep(5)
        print(f"task1 - finished")
        return f"task1!"
        
    @worker.task()
    async def task2(run_after=['task1']):
        print(f"task1 - starting")
        await asyncio.sleep(5)
        print(f"task2 - finished")
        return f"task2!"
        
    @worker.task(run_after=['task1'])
    async def task3(run_after=['task1']):
        print(f"task3 - starting")
        await asyncio.sleep(5)
        print(f"task3 - finished")
        return f"task3!"

    @worker.task()
    async def task4(run_after=['task3'],run_before=['task3']):
        print(f"task4 - starting")
        await asyncio.sleep(5)
        print(f"task4 - finished")
        return f"task4"

Based on the tasks plan i described above , should both run_after and run_before required?
As schedule is not set for none of the tasks ,
I expect that triggering task1 - will trigger the depended tasks automatically but it's not ,
It trigger just task1 .

04-30 20:31 EasyRpc-server /ws/jobs WARNING  worker 5fc6e54c_c8aa_11ec_bd8d_252c84f8bbc8 - pulled job {'job_id': '4d0c86ae-c8ab-11ec-bd8d-252c84f8bbc8', 'namespace': 'ETL', 'node_id': '5fc6e54c_c8aa_11ec_bd8d_252c84f8bbc8-REQ-60fcb1bc-c8aa-11ec-bd8d-252c84f8bbc8', 'status': 'reserved', 'name': 'task1', 'args': {'args': []}, 'kwargs': {'run_before': '[ "task2", "task3" ]'}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': []}, 'run_after': {'run_after': []}, 'last_update': '2022-04-30T20:31:00'}
04-30 20:31 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 1 / 5
task1  - starting
task1 - finished

Question regarding the pipline tasks:
is it possible to pipeline data between tasks - in example , the return values of task1 will use in task2?
Is there an option to reuse task in different nodes ? while providing kwargs dynamically so in I can trigger same task with different run_before and run_after ?

Suggest adding discussion tab and if it possible adding example folder that could be really helpful .
great project!

@codemation
Copy link
Owner

@avico78 glad that you are enjoying the project so far, I wish I could update it more as well.

To accomplish what you are describing, should be doable via the following:

server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        server_secret='abcd1234',
        manager_host='0.0.0.0',
        manager_port=8222,
        manager_secret='abcd1234',
        jobs_queue='ETL',
        max_tasks_per_worker=5
    )
    
    @worker.task(run_after=['task4'])
    async def task1 ():
        print(f"task1  - starting")
        await asyncio.sleep(5)
        print(f"task1 - finished")
        return f"task1!"
        
    @worker.task()
    async def task2():
        print(f"task1 - starting")
        await asyncio.sleep(5)
        print(f"task2 - finished")
        return f"task2!"
        
    @worker.task(run_before=['task2'])
    async def task3():
        print(f"task3 - starting")
        await asyncio.sleep(5)
        print(f"task3 - finished")
        return f"task3!"

    @worker.task(run_before=['task3'])
    async def task4():
        print(f"task4 - starting")
        await asyncio.sleep(5)
        print(f"task4 - finished")
        return f"task4"

run_after - the named task is triggered once the curent task completes.
run_before - the named tasks(and its current depedencies) are run before the current task completes

@avico78
Copy link
Author

avico78 commented May 2, 2022

@codemation - thank much for you answer ,

is both run_after and run_before providing same functionality for triggering dependencies /correct tasks order?
where run_after will trigger the "from current task till" and run_before will trigger "all tasks till current one"?

for "run_after":

I tried triggering the tasks flow as you advised - it is not working ,
it seems each task starting the second one required at least one parameter inside it
and also return statement .
Not sure what is required and why but from some reason - it only work this way ,
Maybe there's reason for that as assumption each task must required some *args/**kwrags to preform tasks pipeline ...
If you could explain more how it works and what is required .

Working example for task1--> task2

    @worker.task(run_after=['task2'],default_args=default_args)
    async def task1(*args,**kwargs):  # actually will work even without it for first task
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

for "run_before":
I tried below tasks setup:
task2 depends on task1
task1 depends on task0

Here I couldn't understand if it must require some parameter(ars/kwargs) as it doesn't work properly
for tasks which has more than one level of dependencies , meaning:

task2--> task1--> task0

where for one level it does work for:

task2 --> task1 && task0

see code for 2 levels(not working):

    @worker.task()
    async def task0(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}
        
    @worker.task(run_before=['task0'])
    async def task1(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task1'] ,default_args=default_args)
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def pipeline():
        print(f"pipline started")
        result = await task2(data={'test': 'data'})
        print(f"pipline - result is {result} - finished")
        return result

While running the run_before on one level of dependencies :

task2 -> task1 && task0

Code:

    @worker.task()
    async def task0(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}
        
    @worker.task()
    async def task1(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task1','task0'] ,default_args=default_args)
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def pipeline():
        print(f"pipline started")
        result = await task2(data={'test': 'data'})
        print(f"pipline - result is {result} - finished")
        return result

it does work for 1-3 times and then it failed:

05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 3 / 5
pipline started
05-02 16:40 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '766817c0-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5ea28ca6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:43'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 4 / 5
task1 started
05-02 16:40 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '76801348-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5f20d41c-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task0', 'args': {'args': []}, 'kwargs': {}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': []}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:44'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 5 / 5
task0 started
task1 finished
task0 finished
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 4 / 5
pipline started
05-02 16:41 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '807ad806-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-779a17f6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:41:00'}
05-02 16:41 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 5 / 5
05-02 16:41 EasyRpc-server /ws/jobs ERROR    error with ws_sender
Traceback (most recent call last):
  File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
    raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')
05-02 16:41 EasyRpc-server /ws/jobs WARNING  started connection to server 0.0.0.0:8222
05-02 16:41 asyncio      ERROR    Task was destroyed but it is pending!
task: <Task pending name='Task-32' coro=<EasyRpcProxy.get_upstream_registered_functions() done, defined at /testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py:224> wait_for=<Future cancelled>>

Also sometimes below error show - couldn't understand why:


Traceback (most recent call last):
  File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
    raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')

Suggestion to add:

1.Add an endpoint for getting the tasks workflow tree (even json view) -
this could help visually see the dependencies

2.for reloading changes im running the apis as:

#manager: 
python -m uvicorn --host 0.0.0.0 --port 8222 job_manager:server --reload


#workder
python -m uvicorn --host 0.0.0.0 --port 8221 job_worker:server --workers=1 --reload

but seem very slow to reload all changes,
maybe adding the option to reset & reload would really help - or can suggest any other alternative ?

3.it could be really interesting if it could be more generic solution for orchestrating tasks ,
meaning task can dynamically configure for both run_before/run_after and also for what functionality ,
so user can build up his ETL flow more dynamically with better usability .

I really think u came up with great idea and hopefully continue developing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants