From 25bd032162fd0f05dc620282f40768db7eb74363 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 12 May 2024 16:26:26 -0700 Subject: [PATCH] add dispatch.worker and dispatch.batch Signed-off-by: Achille Roussel --- src/dispatch/__init__.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/dispatch/__init__.py b/src/dispatch/__init__.py index 65af8bf..2d0955e 100644 --- a/src/dispatch/__init__.py +++ b/src/dispatch/__init__.py @@ -3,9 +3,10 @@ from __future__ import annotations import os +import threading from concurrent import futures from http.server import ThreadingHTTPServer -from typing import Any, Callable, Coroutine, Optional, TypeVar, overload +from typing import Any, Callable, Coroutine, List, Optional, TypeVar, overload from urllib.parse import urlsplit from typing_extensions import ParamSpec, TypeAlias @@ -31,6 +32,7 @@ "Status", "all", "any", + "batch", "call", "function", "gather", @@ -44,7 +46,8 @@ T = TypeVar("T") _registry: Optional[Registry] = None - +_workers: List[Callable[None, None]] = [] +_threads: List[threading.Thread] = [] def default_registry(): global _registry @@ -89,6 +92,18 @@ def run(init: Optional[Callable[P, None]] = None, *args: P.args, **kwargs: P.kwa parsed_url = urlsplit("//" + address) server_address = (parsed_url.hostname or "", parsed_url.port or 0) server = ThreadingHTTPServer(server_address, Dispatch(default_registry())) + + for worker in _workers: + def entrypoint(): + try: + worker() + finally: + server.shutdown() + _threads.append(threading.Thread(target=entrypoint)) + + for thread in _threads: + thread.start() + try: if init is not None: init(*args, **kwargs) @@ -97,7 +112,16 @@ def run(init: Optional[Callable[P, None]] = None, *args: P.args, **kwargs: P.kwa server.shutdown() server.server_close() + for thread in _threads: + thread.join() + def batch() -> Batch: """Create a new batch object.""" return default_registry().batch() + + +def worker(fn: Callable[None, None]) -> Callable[None, None]: + """Decorator declaring workers that will be started when dipatch.run is called.""" + _workers.append(fn) + return fn