Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace multiprocessing.ProcessPoolExecutor with asyncio #464

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions src/cosmic_ray/execution/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@
the root of the cloned repository, so you need to take this into account when
creating the configuration.
"""

import asyncio
import concurrent.futures
import contextlib
import logging
import multiprocessing
import multiprocessing.util
import os
from typing import Iterable, Callable

from cosmic_ray.cloning import ClonedWorkspace
from cosmic_ray.config import ConfigDict
from cosmic_ray.execution.execution_engine import ExecutionEngine
from cosmic_ray.work_item import WorkItem
from cosmic_ray.worker import worker

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -97,21 +101,23 @@ class LocalExecutionEngine(ExecutionEngine):
"The local-git execution engine."

def __call__(self, pending_work, config, on_task_complete):
with multiprocessing.Pool(
asyncio.run(self._execute_pending_works(pending_work, config, on_task_complete))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments about asyncio.run below.


async def _execute_pending_works(self,
pending_work: Iterable[WorkItem],
config: ConfigDict,
on_task_complete: Callable):
loop = asyncio.get_running_loop()

with concurrent.futures.ProcessPoolExecutor(
initializer=_initialize_worker,
initargs=(config,)) as pool:

# pylint: disable=W0511
# TODO: This is not optimal. The pending-work iterable could be millions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep this comment, or some version of it. We have a different version of this same problem in your implementation: we build a list of the size of the number of pending work items, something I'd eventually like to avoid. Until we fix this, I'd like to keep a reminder around to think about it.

# or billions of elements. We don't want to copy it. We copy it right
# now so that we don't access the database in a separate thread (i.e.
# one created by imap_unoredered below). We need to find a way around
# this.
pending = list(pending_work)

results = pool.imap_unordered(
func=_execute_work_item,
iterable=pending)

for job_id, result in results:
on_task_complete(job_id, result)
initargs=(config,)
) as pool:

async def run_task(work_item):
result = await loop.run_in_executor(pool, _execute_work_item,
work_item)
on_task_complete(*result)

tasks = [run_task(work_item) for work_item in pending_work]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just pool.map()? The docs make some claims about map() achieving better performance by chunking the input; we should investigate that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have replaced pending list against tasks list in memory. Where is the best ?

I think there are others problems:

  • sqlite is not under asynio then pending_work generator may blocks all async jobs.
  • Dead lock can occurs if we need to write result in db since pending_work db cursor is not ended.

await asyncio.gather(*tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about memory usage here. tasks is a list of the same size as the number of items in pending_work, potentially millions or even billions of items. When we destructure it on the call to asyncio.gather(), Python is going to create a complete copy of it into a tuple, essentially doubling the space we need for this function.

Is there some way we can tell asyncio to process everything lazily and report when they're all done?

3 changes: 1 addition & 2 deletions src/cosmic_ray/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,5 @@ def run_tests(command, timeout=None):
asyncio.set_event_loop_policy(
asyncio.WindowsProactorEventLoopPolicy())

result = asyncio.get_event_loop().run_until_complete(
_run_tests(command, timeout))
result = asyncio.run(_run_tests(command, timeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my reading of the docs, using run instead of get_event_loop().run_until_complete() seems quite limiting. run doesn't allow other event loops to be running on the thread. This is probably not an issue for us currently, but I could easily imagine CR using asyncio more in the future, so using run() here feels like we're asking for trouble in the future for little or no short-term gain. Is there something I'm missing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to follow up on this, the docs for run say:

This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.

If nothing else, you're calling it in two places in this PR. This feels like something we need to address.

return result