From b73e4eefeeaa9b3f643283a3a67a8d6934cd06ce Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Fri, 13 Dec 2024 20:43:21 -0800 Subject: [PATCH] [CLEANUP TORCH] --- .github/workflows/code_quality_control.yml | 2 +- .github/workflows/lints.yml | 2 +- .github/workflows/pr_request_checks.yml | 2 +- .github/workflows/pull-request-links.yml | 2 +- clusterops/__init__.py | 10 +- clusterops/main.py | 265 +++++++- clusterops/profiling_exec.py | 266 ++++---- example_gpu_scheduler.py | 2 +- experimental/dynamic_exec.py | 570 ++++++++++++++++++ .../fractorial_cpu.py | 85 ++- {clusterops => experimental}/gpu_scheduler.py | 166 +++-- pyproject.toml | 3 +- 12 files changed, 1160 insertions(+), 215 deletions(-) create mode 100644 experimental/dynamic_exec.py rename {clusterops => experimental}/fractorial_cpu.py (60%) rename {clusterops => experimental}/gpu_scheduler.py (69%) diff --git a/.github/workflows/code_quality_control.yml b/.github/workflows/code_quality_control.yml index 5bc4b1e..39f2f41 100644 --- a/.github/workflows/code_quality_control.yml +++ b/.github/workflows/code_quality_control.yml @@ -22,7 +22,7 @@ jobs: run: pip install --no-cache-dir -r requirements.txt - name: Find Python files - run: find swarms_torch -name "*.py" -type f -exec autopep8 --in-place --aggressive --aggressive {} + + run: find clusterops -name "*.py" -type f -exec autopep8 --in-place --aggressive --aggressive {} + - name: Push changes uses: ad-m/github-push-action@master diff --git a/.github/workflows/lints.yml b/.github/workflows/lints.yml index 2a667b0..942555f 100644 --- a/.github/workflows/lints.yml +++ b/.github/workflows/lints.yml @@ -22,4 +22,4 @@ jobs: run: pip install --no-cache-dir -r requirements.txt - name: Run linters - run: pylint swarms_torch \ No newline at end of file + run: pylint clusterops \ No newline at end of file diff --git a/.github/workflows/pr_request_checks.yml b/.github/workflows/pr_request_checks.yml index 6322998..69b97b1 100644 --- a/.github/workflows/pr_request_checks.yml +++ b/.github/workflows/pr_request_checks.yml @@ -24,4 +24,4 @@ jobs: - name: Run tests and checks run: | pytest tests/ - pylint swarms_torch \ No newline at end of file + pylint clusterops \ No newline at end of file diff --git a/.github/workflows/pull-request-links.yml b/.github/workflows/pull-request-links.yml index e5812fb..abecb9b 100644 --- a/.github/workflows/pull-request-links.yml +++ b/.github/workflows/pull-request-links.yml @@ -15,4 +15,4 @@ jobs: steps: - uses: readthedocs/actions/preview@v1 with: - project-slug: swarms_torch \ No newline at end of file + project-slug: clusterops \ No newline at end of file diff --git a/clusterops/__init__.py b/clusterops/__init__.py index 5520758..70d27e0 100644 --- a/clusterops/__init__.py +++ b/clusterops/__init__.py @@ -5,13 +5,15 @@ execute_on_gpu, execute_on_cpu, execute_on_multiple_gpus, + distributed_execute_on_gpus, + execute_with_all_cpu_cores + ) from clusterops.profiling_exec import ( monitor_resources, profile_execution, - distributed_execute_on_gpus, ) -from clusterops.gpu_scheduler import GPUJobExecutor, JobScheduler, gpu_scheduler + __all__ = [ "list_available_cpus", @@ -21,9 +23,7 @@ "execute_on_multiple_gpus", "monitor_resources", "profile_execution", - "GPUJobExecutor", "execute_on_cpu", - "JobScheduler", - "gpu_scheduler", "distributed_execute_on_gpus", + "execute_with_all_cpu_cores", ] diff --git a/clusterops/main.py b/clusterops/main.py index e422fbc..94cc767 100644 --- a/clusterops/main.py +++ b/clusterops/main.py @@ -8,6 +8,9 @@ import psutil import ray from loguru import logger +from ray.util.multiprocessing import ( + Pool, +) # For distributed multi-node execution from tenacity import ( retry, retry_if_exception_type, @@ -53,45 +56,94 @@ def list_available_cpus() -> List[int]: except Exception as e: logger.error(f"Error listing CPUs: {e}") raise - + + + @retry( stop=stop_after_attempt(3), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), ) -def select_best_gpu() -> Optional[int]: +def execute_with_cpu_cores( + core_count: int, func: Callable, *args: Any, **kwargs: Any +) -> Any: """ - Selects the GPU with the most free memory. + Executes a callable using a specified number of currently unused CPU cores. + + Args: + core_count (int): The number of CPU cores to run the function on. + func (Callable): The function to be executed. + *args (Any): Arguments for the callable. + **kwargs (Any): Keyword arguments for the callable. Returns: - Optional[int]: The GPU ID of the best available GPU, or None if no GPUs are available. + Any: The result of the function execution. + + Raises: + ValueError: If the number of CPU cores specified is invalid or exceeds available unused cores. + RuntimeError: If there is an error executing the function on the specified CPU cores. """ try: - gpus = list_available_gpus() - best_gpu = max(gpus, key=lambda gpu: gpu["memoryFree"]) + # Get all CPU cores + all_cpus = list_available_cpus() + + # Find cores currently in use by checking CPU utilization + cpu_percent_per_core = psutil.cpu_percent(interval=0.1, percpu=True) + unused_cores = [ + cpu for cpu, usage in enumerate(cpu_percent_per_core) + if usage < 10.0 # Consider cores with <10% usage as unused + ] + + if not unused_cores: + logger.warning("No unused CPU cores found, falling back to all available cores") + unused_cores = all_cpus + + if core_count > len(unused_cores) or core_count <= 0: + raise ValueError( + f"Invalid core count: {core_count}. Available unused CPUs are {unused_cores}." + ) + + if platform.system() == "Darwin": # macOS + logger.warning( + "CPU affinity is not supported on macOS. Skipping setting CPU affinity." + ) + else: + # Set CPU affinity to use the specified number of unused cores + selected_cores = unused_cores[:core_count] + logger.info( + f"Setting CPU affinity to unused cores {selected_cores} and executing the function." + ) + psutil.Process().cpu_affinity(selected_cores) + + result = func(*args, **kwargs) logger.info( - f"Selected GPU {best_gpu['id']} with {best_gpu['memoryFree']} MB free memory." + f"Execution using {core_count} unused CPU cores completed." ) - return best_gpu["id"] + return result except Exception as e: - logger.error(f"Error selecting best GPU: {e}") - return None + logger.error( + f"Error executing with {core_count} CPU cores: {e}" + ) + raise + + + + @retry( stop=stop_after_attempt(3), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), ) -def execute_on_cpu( - cpu_id: int, func: Callable, *args: Any, **kwargs: Any +def execute_with_all_cpu_cores( + func: Callable, *args: Any, **kwargs: Any ) -> Any: """ - Executes a callable on a specific CPU. + Executes a callable using all currently unused CPU cores. Args: - cpu_id (int): The CPU core to run the function on. func (Callable): The function to be executed. *args (Any): Arguments for the callable. **kwargs (Any): Keyword arguments for the callable. @@ -100,32 +152,116 @@ def execute_on_cpu( Any: The result of the function execution. Raises: - ValueError: If the CPU core specified is invalid. - RuntimeError: If there is an error executing the function on the CPU. + RuntimeError: If there is an error executing the function on the CPU cores. """ try: - available_cpus = list_available_cpus() - if cpu_id not in available_cpus: - raise ValueError( - f"Invalid CPU core: {cpu_id}. Available CPUs are {available_cpus}." - ) + # Get all CPU cores + all_cpus = list_available_cpus() + + # Find cores currently in use by checking CPU utilization + cpu_percent_per_core = psutil.cpu_percent(interval=0.1, percpu=True) + unused_cores = [ + cpu for cpu, usage in enumerate(cpu_percent_per_core) + if usage < 10.0 # Consider cores with <10% usage as unused + ] + + if not unused_cores: + logger.warning("No unused CPU cores found, falling back to all available cores") + unused_cores = all_cpus + + logger.info(f"Found {len(unused_cores)} unused CPU cores out of {len(all_cpus)} total cores") if platform.system() == "Darwin": # macOS logger.warning( "CPU affinity is not supported on macOS. Skipping setting CPU affinity." ) else: - logger.info(f"Setting CPU affinity to core {cpu_id}.") - psutil.Process().cpu_affinity([cpu_id]) + # Set CPU affinity to use all unused cores + logger.info( + f"Setting CPU affinity to unused cores {unused_cores} and executing the function." + ) + psutil.Process().cpu_affinity(unused_cores) result = func(*args, **kwargs) - logger.info(f"Execution on CPU {cpu_id} completed.") + logger.info( + f"Execution using {len(unused_cores)} unused CPU cores completed." + ) return result except Exception as e: - logger.error(f"Error executing on CPU {cpu_id}: {e}") + logger.error( + f"Error executing with CPU cores: {e}" + ) raise + + +@retry( + stop=stop_after_attempt(3), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), +) +def select_best_gpu() -> Optional[int]: + """ + Selects the GPU with the most free memory. + + Returns: + Optional[int]: The GPU ID of the best available GPU, or None if no GPUs are available. + """ + try: + gpus = list_available_gpus() + best_gpu = max(gpus, key=lambda gpu: gpu["memoryFree"]) + logger.info( + f"Selected GPU {best_gpu['id']} with {best_gpu['memoryFree']} MB free memory." + ) + return best_gpu["id"] + except Exception as e: + logger.error(f"Error selecting best GPU: {e}") + return None + + +def get_cpu_info(): + """ + Detects available CPU cores using multiple methods for reliability. + Returns tuple of (physical_cores, logical_cores, available_cores) + """ + try: + # Physical cores (excluding hyperthreading) + physical = psutil.cpu_count(logical=False) or 1 + + # Logical cores (including hyperthreading) + logical = psutil.cpu_count(logical=True) or physical + + # Currently available cores (accounting for system restrictions) + try: + available = len(psutil.Process().cpu_affinity()) + except AttributeError: # For systems without affinity support + available = logical + + # Sanity checks and adjustments + physical = max(1, physical) + logical = max(physical, logical) + available = max(1, min(available, logical)) + + return physical, logical, available + + except Exception as e: + logger.error(f"Error detecting CPU cores: {e}") + return 1, 1, 1 # Fallback to minimum safe values + + +def get_optimal_core_count(requested_cores: int = None) -> int: + """Returns optimal number of cores to use based on system capabilities""" + physical, logical, available = get_cpu_info() + + if requested_cores is None: + # Use 75% of available cores by default, minimum of 1 + return max(1, int(available * 0.75)) + + return max(1, min(requested_cores, available)) + + + @retry( stop=stop_after_attempt(3), wait=wait_fixed(1), @@ -174,7 +310,7 @@ def retry_with_backoff( wait=wait_fixed(1), retry=retry_if_exception_type(Exception), ) -def execute_with_cpu_cores( +def execute_on_cpu( core_count: int, func: Callable, *args: Any, **kwargs: Any ) -> Any: """ @@ -360,6 +496,83 @@ def task_wrapper(*args, **kwargs): raise +def distributed_execute_on_gpus( + gpu_ids: List[int], + func: Callable, + *args: Any, + num_retries: int = RETRY_COUNT, + retry_delay: float = RETRY_DELAY, + **kwargs: Any, +) -> List[Any]: + """ + Executes a callable across multiple GPUs and nodes using Ray's distributed task scheduling. + + Args: + gpu_ids (List[int]): List of GPU IDs to run the function on. Must be valid IDs. + func (Callable): Function to execute. Must be serializable. + *args (Any): Arguments for the callable + num_retries (int): Number of retry attempts for failed tasks + retry_delay (float): Delay in seconds between retries + **kwargs (Any): Keyword arguments for the callable + + Returns: + List[Any]: Results from execution on each GPU in order of gpu_ids + + Raises: + ValueError: If gpu_ids is empty or contains invalid IDs + RuntimeError: If Ray initialization or task execution fails + TimeoutError: If execution exceeds maximum retry attempts + """ + if not gpu_ids: + raise ValueError("Must specify at least one GPU ID") + + available_gpus = [gpu.id for gpu in GPUtil.getGPUs()] + invalid_gpus = [id for id in gpu_ids if id not in available_gpus] + if invalid_gpus: + raise ValueError(f"Invalid GPU IDs: {invalid_gpus}") + + try: + logger.info( + f"Executing function across distributed GPUs {gpu_ids}" + ) + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True, log_to_driver=False) + + @ray.remote(num_gpus=1) + def task_wrapper(*task_args, **task_kwargs): + for attempt in range(num_retries): + try: + return func(*task_args, **task_kwargs) + except Exception as e: + if attempt == num_retries - 1: + raise + logger.warning( + f"Attempt {attempt + 1} failed: {str(e)}" + ) + time.sleep( + retry_delay * (attempt + 1) + ) # Exponential backoff + + # Distribute tasks + pool = Pool() + result_futures = [ + pool.apply_async(task_wrapper.remote, args=(args, kwargs)) + for gpu_id in gpu_ids + ] + pool.close() + pool.join() + + results = [future.get() for future in result_futures] + logger.info("Distributed execution completed successfully") + return results + + except Exception as e: + error_msg = f"Error during distributed execution: {str(e)}" + logger.error(error_msg) + raise RuntimeError(error_msg) from e + + # # Example function to run # def sample_task(n: int) -> int: # return n * n diff --git a/clusterops/profiling_exec.py b/clusterops/profiling_exec.py index 0929784..be15b6c 100644 --- a/clusterops/profiling_exec.py +++ b/clusterops/profiling_exec.py @@ -1,23 +1,32 @@ import os import sys import time -from typing import Any, Callable, List +from typing import Any, Callable, Dict, Optional, Union import GPUtil import psutil -import ray from loguru import logger -from ray.util.multiprocessing import ( - Pool, -) # For distributed multi-node execution -# Configurable environment variables +# Configurable environment variables with validation LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") -RETRY_COUNT = int(os.getenv("RETRY_COUNT", 3)) -RETRY_DELAY = float(os.getenv("RETRY_DELAY", 1.0)) -CPU_THRESHOLD = int( - os.getenv("CPU_THRESHOLD", 90) -) # CPU usage threshold for alerts +if LOG_LEVEL.upper() not in [ + "DEBUG", + "INFO", + "WARNING", + "ERROR", + "CRITICAL", +]: + LOG_LEVEL = "INFO" + +RETRY_COUNT = max( + 1, int(os.getenv("RETRY_COUNT", 3)) +) # Minimum 1 retry +RETRY_DELAY = max( + 0.1, float(os.getenv("RETRY_DELAY", 1.0)) +) # Minimum 0.1s delay +CPU_THRESHOLD = min( + 100, max(0, int(os.getenv("CPU_THRESHOLD", 90))) +) # 0-100 range # Configure Loguru logger for detailed logging logger.remove() @@ -28,125 +37,170 @@ ) -def monitor_resources(): +def monitor_resources( + cpu_threshold: Optional[int] = None, + gpu_threshold: Optional[int] = 90, + interval: float = 1.0, +) -> Dict[str, Any]: """ Continuously monitors CPU and GPU resources and logs alerts when thresholds are crossed. - """ - try: - cpu_usage = psutil.cpu_percent(interval=1) - if cpu_usage > CPU_THRESHOLD: - logger.warning( - f"CPU usage exceeds {CPU_THRESHOLD}%: Current usage {cpu_usage}%" - ) - - # Monitor GPU memory usage - gpus = GPUtil.getGPUs() - for gpu in gpus: - memory_usage = 100 * ( - 1 - gpu.memoryFree / gpu.memoryTotal - ) - if ( - memory_usage > 90 - ): # Example threshold for GPU memory usage - logger.warning( - f"GPU {gpu.id} memory usage exceeds 90%: Current usage {memory_usage}%" - ) - - logger.info("Resource monitoring completed.") - - except Exception as e: - logger.error(f"Error monitoring resources: {e}") - raise - - -def profile_execution( - func: Callable, *args: Any, **kwargs: Any -) -> Any: - """ - Profiles the execution of a task, collecting metrics like execution time and CPU/GPU usage. Args: - func (Callable): The function to profile. - *args (Any): Arguments for the callable. - **kwargs (Any): Keyword arguments for the callable. + cpu_threshold (Optional[int]): CPU usage percentage threshold for alerts (0-100). + If None, uses CPU_THRESHOLD from env vars. + gpu_threshold (Optional[int]): GPU memory usage percentage threshold for alerts (0-100). + If None, monitoring of GPUs is disabled. + interval (float): Time interval in seconds between measurements. Returns: - Any: The result of the function execution along with the collected metrics. + Dict[str, Any]: Resource usage statistics including: + - cpu_usage: Current CPU usage percentage + - gpu_stats: List of dicts with GPU stats (id, memory_used, memory_total) + - alerts: List of any threshold violations + + Raises: + ValueError: If thresholds are not in valid range 0-100 + RuntimeError: If resource monitoring fails """ - start_time = time.time() - - # Get initial CPU and memory usage - initial_cpu_usage = psutil.cpu_percent() - gpus = GPUtil.getGPUs() - initial_gpu_memory = [gpu.memoryFree for gpu in gpus] - - # Execute the function - result = func(*args, **kwargs) - - # Metrics after execution - execution_time = time.time() - start_time - final_cpu_usage = psutil.cpu_percent() - final_gpu_memory = [gpu.memoryFree for gpu in gpus] - - logger.info(f"Task execution time: {execution_time}s") - logger.info( - f"CPU usage change: {initial_cpu_usage}% -> {final_cpu_usage}%" - ) - for idx, gpu in enumerate(gpus): - logger.info( - f"GPU {idx} memory usage change: {initial_gpu_memory[idx]} MB -> {final_gpu_memory[idx]} MB" + if cpu_threshold is not None and not 0 <= cpu_threshold <= 100: + raise ValueError("CPU threshold must be between 0 and 100") + if gpu_threshold is not None and not 0 <= gpu_threshold <= 100: + raise ValueError("GPU threshold must be between 0 and 100") + + stats = {"cpu_usage": 0.0, "gpu_stats": [], "alerts": []} + + try: + # Monitor CPU + stats["cpu_usage"] = psutil.cpu_percent(interval=interval) + threshold = ( + cpu_threshold + if cpu_threshold is not None + else CPU_THRESHOLD ) + if stats["cpu_usage"] > threshold: + alert = f"CPU usage exceeds {threshold}%: Current usage {stats['cpu_usage']}%" + stats["alerts"].append(alert) + logger.warning(alert) + + # Monitor GPUs if threshold provided + if gpu_threshold is not None: + gpus = GPUtil.getGPUs() + for gpu in gpus: + memory_usage = 100 * ( + 1 - gpu.memoryFree / gpu.memoryTotal + ) + gpu_stat = { + "id": gpu.id, + "memory_used": gpu.memoryUsed, + "memory_total": gpu.memoryTotal, + "usage_percent": memory_usage, + } + stats["gpu_stats"].append(gpu_stat) + + if memory_usage > gpu_threshold: + alert = f"GPU {gpu.id} memory usage exceeds {gpu_threshold}%: Current usage {memory_usage:.1f}%" + stats["alerts"].append(alert) + logger.warning(alert) + + logger.info("Resource monitoring completed successfully") + return stats - return result + except Exception as e: + error_msg = f"Error monitoring resources: {str(e)}" + logger.error(error_msg) + raise RuntimeError(error_msg) from e -def distributed_execute_on_gpus( - gpu_ids: List[int], func: Callable, *args: Any, **kwargs: Any -) -> List[Any]: +def profile_execution( + func: Callable, + *args: Any, + collect_gpu_metrics: bool = True, + **kwargs: Any, +) -> Dict[str, Union[Any, float, Dict]]: """ - Executes a callable across multiple GPUs and nodes using Ray's distributed task scheduling. + Profiles the execution of a task, collecting metrics like execution time and resource usage. Args: - gpu_ids (List[int]): The list of GPU IDs across nodes to run the function on. - func (Callable): The function to be executed. - *args (Any): Arguments for the callable. - **kwargs (Any): Keyword arguments for the callable. + func (Callable): The function to profile + *args (Any): Arguments for the callable + collect_gpu_metrics (bool): Whether to collect GPU metrics. Default True. + **kwargs (Any): Keyword arguments for the callable Returns: - List[Any]: A list of results from the execution on each GPU. + Dict containing: + - result: Return value from the function + - metrics: Dict of execution metrics including: + - execution_time: Time taken in seconds + - cpu_usage: Dict of CPU usage before/after + - gpu_usage: Dict of GPU memory usage before/after (if enabled) + + Raises: + RuntimeError: If profiling or function execution fails """ + metrics = { + "execution_time": 0.0, + "cpu_usage": {}, + "gpu_usage": {}, + } + try: + start_time = time.time() + + # Get initial resource usage + metrics["cpu_usage"]["initial"] = psutil.cpu_percent() + + if collect_gpu_metrics: + gpus = GPUtil.getGPUs() + metrics["gpu_usage"]["initial"] = { + gpu.id: { + "free": gpu.memoryFree, + "used": gpu.memoryUsed, + "total": gpu.memoryTotal, + } + for gpu in gpus + } + + # Execute function + result = func(*args, **kwargs) + + # Collect final metrics + metrics["execution_time"] = time.time() - start_time + metrics["cpu_usage"]["final"] = psutil.cpu_percent() + + if collect_gpu_metrics: + gpus = GPUtil.getGPUs() + metrics["gpu_usage"]["final"] = { + gpu.id: { + "free": gpu.memoryFree, + "used": gpu.memoryUsed, + "total": gpu.memoryTotal, + } + for gpu in gpus + } + + # Log metrics logger.info( - f"Executing function across distributed GPUs {gpu_ids}." + f"Task execution time: {metrics['execution_time']:.2f}s" ) - - if not ray.is_initialized(): - ray.init(ignore_reinit_error=True, log_to_driver=False) - - @ray.remote(num_gpus=1) - def task_wrapper(*args, **kwargs): - return func(*args, **kwargs) - - # Distribute the task across nodes - pool = Pool() - result_futures = [ - pool.apply_async(task_wrapper.remote, args=(args, kwargs)) - for gpu_id in gpu_ids - ] - pool.close() - pool.join() - - results = [future.get() for future in result_futures] logger.info( - f"Distributed execution on GPUs {gpu_ids} completed." + f"CPU usage: {metrics['cpu_usage']['initial']}% -> {metrics['cpu_usage']['final']}%" ) - return results + + if collect_gpu_metrics: + for gpu_id, usage in metrics["gpu_usage"][ + "final" + ].items(): + initial = metrics["gpu_usage"]["initial"][gpu_id] + logger.info( + f"GPU {gpu_id} memory: {initial['free']}MB free -> {usage['free']}MB free" + ) + + return {"result": result, "metrics": metrics} except Exception as e: - logger.error( - f"Error during distributed execution on GPUs {gpu_ids}: {e}" - ) - raise + error_msg = f"Error during profiled execution: {str(e)}" + logger.error(error_msg) + raise RuntimeError(error_msg) from e # # Example function to run diff --git a/example_gpu_scheduler.py b/example_gpu_scheduler.py index 1912612..7b712c3 100644 --- a/example_gpu_scheduler.py +++ b/example_gpu_scheduler.py @@ -1,4 +1,4 @@ -from clusterops import gpu_scheduler +from experimental import gpu_scheduler async def sample_task(n: int) -> int: diff --git a/experimental/dynamic_exec.py b/experimental/dynamic_exec.py new file mode 100644 index 0000000..30ce2d8 --- /dev/null +++ b/experimental/dynamic_exec.py @@ -0,0 +1,570 @@ +import os +import sys +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Tuple +from functools import wraps +import inspect +import numpy as np +import psutil +import torch +from loguru import logger +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_fixed, +) +import time + +# Configure Loguru logger +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +LOG_FORMAT = ( + "{time:YYYY-MM-DD HH:mm:ss.SSS} | " + "{level: <8} | " + "{name}:{function}:{line} | " + "{message}" +) + +# Remove default logger and add custom configuration +logger.remove() +logger.add( + sys.stderr, + format=LOG_FORMAT, + level=LOG_LEVEL, + backtrace=True, + diagnose=True, + enqueue=True, +) +logger.add( + "resource_manager_{time}.log", + format=LOG_FORMAT, + level=LOG_LEVEL, + rotation="100 MB", + retention="30 days", + compression="zip", +) + + +@dataclass +class ResourceRequirements: + """Data class to store predicted resource requirements.""" + + cpu_memory_bytes: int + gpu_memory_bytes: int + requires_gpu: bool + estimated_runtime_seconds: float + + +@dataclass +class ResourceAllocation: + """Data class to store actual resource allocation.""" + + cpu_memory_reserved: int + gpu_memory_reserved: int + gpu_id: Optional[int] + cpu_cores: List[int] + allocation_time: float = time.time() + + +class InsufficientResourcesError(Exception): + """Raised when there are not enough resources to execute the function.""" + + pass + + +class MemoryPredictor: + """Predicts memory requirements for function execution.""" + + def __init__(self): + self.history: Dict[str, List[ResourceRequirements]] = {} + self.sample_size = 3 + logger.info( + "Initialized MemoryPredictor with sample size {}", + self.sample_size, + ) + + def update_history( + self, func_name: str, requirements: ResourceRequirements + ) -> None: + """ + Updates the execution history for a function with new resource requirements. + + Args: + func_name: Name of the function + requirements: Resource requirements from the latest execution + """ + try: + with logger.contextualize(function=func_name): + if func_name not in self.history: + self.history[func_name] = [] + + self.history[func_name].append(requirements) + + # Keep only recent history + if len(self.history[func_name]) > self.sample_size: + self.history[func_name] = self.history[func_name][ + -self.sample_size : + ] + + logger.debug( + "Updated history for function {}. Total records: {}", + func_name, + len(self.history[func_name]), + ) + + # Calculate and log average requirements + avg_cpu = sum( + r.cpu_memory_bytes + for r in self.history[func_name] + ) / len(self.history[func_name]) + avg_gpu = sum( + r.gpu_memory_bytes + for r in self.history[func_name] + ) / len(self.history[func_name]) + + logger.debug( + "Average requirements - CPU: {} bytes, GPU: {} bytes", + int(avg_cpu), + int(avg_gpu), + ) + + except Exception as e: + logger.exception( + "Error updating history for function {}: {}", + func_name, + str(e), + ) + + def get_historical_requirements( + self, func_name: str + ) -> Optional[ResourceRequirements]: + """ + Gets average historical resource requirements for a function. + + Args: + func_name: Name of the function + + Returns: + Optional[ResourceRequirements]: Average requirements or None if no history + """ + try: + if ( + func_name not in self.history + or not self.history[func_name] + ): + logger.debug( + "No history found for function {}", func_name + ) + return None + + history = self.history[func_name] + avg_requirements = ResourceRequirements( + cpu_memory_bytes=int( + sum(r.cpu_memory_bytes for r in history) + / len(history) + ), + gpu_memory_bytes=int( + sum(r.gpu_memory_bytes for r in history) + / len(history) + ), + requires_gpu=any(r.requires_gpu for r in history), + estimated_runtime_seconds=sum( + r.estimated_runtime_seconds for r in history + ) + / len(history), + ) + + logger.debug( + "Retrieved historical requirements for {}: {}", + func_name, + avg_requirements, + ) + return avg_requirements + + except Exception as e: + logger.exception( + "Error retrieving historical requirements for {}: {}", + func_name, + str(e), + ) + return None + + def _estimate_object_size(self, obj: Any) -> int: + """ + Estimates the memory size of a Python object. + + Args: + obj: Any Python object + + Returns: + int: Estimated size in bytes + """ + try: + with logger.contextualize(object_type=type(obj).__name__): + if isinstance(obj, (np.ndarray, torch.Tensor)): + size = obj.nbytes + logger.debug("Array/Tensor size: {} bytes", size) + return size + elif isinstance(obj, (list, tuple, set, dict)): + size = sys.getsizeof(obj) + sum( + self._estimate_object_size(item) + for item in obj + ) + logger.debug("Container size: {} bytes", size) + return size + + size = sys.getsizeof(obj) + logger.debug("Object size: {} bytes", size) + return size + except Exception as e: + logger.exception("Error estimating object size: {}", e) + return 0 + + def _analyze_function_memory( + self, func: Callable, *args, **kwargs + ) -> ResourceRequirements: + """ + Analyzes function's memory requirements based on its arguments and source code. + + Args: + func: Function to analyze + *args: Function arguments + **kwargs: Function keyword arguments + + Returns: + ResourceRequirements: Predicted resource requirements + """ + logger.info( + "Analyzing memory requirements for function: {}", + func.__name__, + ) + + with logger.contextualize(function=func.__name__): + try: + # Estimate memory for input arguments + args_size = sum( + self._estimate_object_size(arg) for arg in args + ) + kwargs_size = sum( + self._estimate_object_size(value) + for value in kwargs.values() + ) + + logger.debug("Arguments size: {} bytes", args_size) + logger.debug( + "Keyword arguments size: {} bytes", kwargs_size + ) + + # Analyze function source code for GPU operations + source = inspect.getsource(func) + requires_gpu = any( + keyword in source + for keyword in ["torch.cuda", "gpu", "GPU"] + ) + logger.debug( + "GPU requirement detected: {}", requires_gpu + ) + + # Estimate GPU memory if needed (using a heuristic based on input size) + gpu_memory = args_size * 2 if requires_gpu else 0 + + # Add buffer for intermediate computations (50% of input size) + cpu_memory = (args_size + kwargs_size) * 1.5 + + requirements = ResourceRequirements( + cpu_memory_bytes=int(cpu_memory), + gpu_memory_bytes=int(gpu_memory), + requires_gpu=requires_gpu, + estimated_runtime_seconds=1.0, + ) + + logger.info( + "Memory requirements analysis complete: {}", + requirements, + ) + return requirements + + except Exception as e: + logger.exception( + "Error analyzing function memory requirements: {}", + e, + ) + raise + + +class ResourceManager: + """Manages resource allocation for function execution.""" + + def __init__(self): + self.predictor = MemoryPredictor() + self.reserved_cpu_memory = 0 + self.reserved_gpu_memory = {} + logger.info("Initialized ResourceManager") + + # Log initial system resources + self._log_system_resources() + + def _log_system_resources(self): + """Logs current system resource status.""" + cpu_info = { + "total_cpu_memory": psutil.virtual_memory().total, + "available_cpu_memory": psutil.virtual_memory().available, + "cpu_percent": psutil.cpu_percent(interval=1), + "cpu_count": psutil.cpu_count(), + } + + gpu_info = {} + if torch.cuda.is_available(): + for i in range(torch.cuda.device_count()): + gpu_info[f"gpu_{i}"] = { + "name": torch.cuda.get_device_name(i), + "total_memory": torch.cuda.get_device_properties( + i + ).total_memory, + "allocated_memory": torch.cuda.memory_allocated( + i + ), + } + + logger.info("System resources - CPU: {}", cpu_info) + if gpu_info: + logger.info("System resources - GPU: {}", gpu_info) + + def _get_available_resources(self) -> Tuple[int, Dict[int, int]]: + """ + Gets available CPU and GPU memory. + + Returns: + Tuple[int, Dict[int, int]]: Available CPU memory and GPU memory per device + """ + with logger.contextualize(operation="resource_check"): + cpu_available = psutil.virtual_memory().available + gpu_available = {} + + if torch.cuda.is_available(): + for i in range(torch.cuda.device_count()): + gpu_available[i] = ( + torch.cuda.get_device_properties( + i + ).total_memory + - torch.cuda.memory_allocated(i) + ) + + logger.debug( + "Available CPU memory: {} bytes", cpu_available + ) + logger.debug("Available GPU memory: {}", gpu_available) + + return cpu_available, gpu_available + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + ) + def allocate_resources( + self, requirements: ResourceRequirements + ) -> ResourceAllocation: + """ + Allocates required resources for function execution. + + Args: + requirements: ResourceRequirements object with predicted requirements + + Returns: + ResourceAllocation: Allocated resources + + Raises: + InsufficientResourcesError: If required resources are not available + """ + with logger.contextualize(operation="resource_allocation"): + logger.info( + "Attempting to allocate resources: {}", requirements + ) + + cpu_available, gpu_available = ( + self._get_available_resources() + ) + + # Check CPU memory availability + if requirements.cpu_memory_bytes > cpu_available: + logger.error( + "Insufficient CPU memory. Required: {}, Available: {}", + requirements.cpu_memory_bytes, + cpu_available, + ) + raise InsufficientResourcesError( + "Insufficient CPU memory" + ) + + # Select GPU if required + gpu_id = None + if requirements.requires_gpu: + for gid, available in gpu_available.items(): + if available >= requirements.gpu_memory_bytes: + gpu_id = gid + logger.info("Selected GPU {}", gpu_id) + break + else: + logger.error( + "No GPU with sufficient memory. Required: {}", + requirements.gpu_memory_bytes, + ) + raise InsufficientResourcesError( + "Insufficient GPU memory" + ) + + # Allocate CPU cores based on memory requirements + total_cores = psutil.cpu_count() + cores_needed = max( + 1, + min( + total_cores, + requirements.cpu_memory_bytes + // (cpu_available // total_cores), + ), + ) + allocated_cores = list(range(cores_needed)) + + allocation = ResourceAllocation( + cpu_memory_reserved=requirements.cpu_memory_bytes, + gpu_memory_reserved=( + requirements.gpu_memory_bytes + if gpu_id is not None + else 0 + ), + gpu_id=gpu_id, + cpu_cores=allocated_cores, + allocation_time=time.time(), + ) + + logger.success( + "Resource allocation successful: {}", allocation + ) + return allocation + + def release_resources(self, allocation: ResourceAllocation): + """Releases allocated resources.""" + with logger.contextualize(operation="resource_release"): + try: + self.reserved_cpu_memory -= ( + allocation.cpu_memory_reserved + ) + if allocation.gpu_id is not None: + self.reserved_gpu_memory[ + allocation.gpu_id + ] -= allocation.gpu_memory_reserved + + allocation_duration = ( + time.time() - allocation.allocation_time + ) + logger.info( + "Resources released. Duration: {:.2f}s, CPU: {} bytes, GPU: {} bytes", + allocation_duration, + allocation.cpu_memory_reserved, + allocation.gpu_memory_reserved, + ) + except Exception as e: + logger.exception("Error releasing resources: {}", e) + raise + + +def with_resource_management(func: Callable) -> Callable: + """ + Decorator that handles resource prediction and allocation for a function. + + Args: + func: Function to be wrapped + + Returns: + Callable: Wrapped function with resource management + """ + resource_manager = ResourceManager() + + @wraps(func) + def wrapper(*args, **kwargs): + with logger.contextualize( + function=func.__name__, execution_id=time.time_ns() + ): + logger.info("Starting execution with resource management") + start_time = time.time() + + try: + # Check historical data first + historical_requirements = resource_manager.predictor.get_historical_requirements( + func.__name__ + ) + + if historical_requirements is not None: + logger.info( + "Using historical requirements: {}", + historical_requirements, + ) + requirements = historical_requirements + else: + # Predict new requirements + requirements = resource_manager.predictor._analyze_function_memory( + func, *args, **kwargs + ) + + # Allocate resources + allocation = resource_manager.allocate_resources( + requirements + ) + + # Execute function with allocated resources + if allocation.gpu_id is not None: + with torch.cuda.device(allocation.gpu_id): + result = func(*args, **kwargs) + else: + result = func(*args, **kwargs) + + # Update execution history + execution_time = time.time() - start_time + requirements.estimated_runtime_seconds = ( + execution_time + ) + resource_manager.predictor.update_history( + func.__name__, requirements + ) + + logger.success( + "Function execution completed successfully in {:.2f}s", + execution_time, + ) + + return result + + except Exception as e: + logger.exception( + "Error during function execution: {}", e + ) + raise + + finally: + if "allocation" in locals(): + resource_manager.release_resources(allocation) + + return wrapper + + +if __name__ == "__main__": + + @with_resource_management + def process_large_matrix(size: int = 10000) -> np.ndarray: + # Create a large matrix and perform memory-intensive operations + data = np.random.random((size, size)) + result = np.matmul(data, data.T) # Matrix multiplication + + # Additional operations to increase memory usage + for _ in range(3): + result = np.exp(result) # Element-wise exponential + result = np.sin(result) # Element-wise sine + result = result @ result # Another matrix multiplication + + return result + + # Process a 10k x 10k matrix with intensive operations + result = process_large_matrix() + print(f"Result shape: {result.shape}") + print(f"Peak memory usage: {result.nbytes / 1e9:.2f} GB") diff --git a/clusterops/fractorial_cpu.py b/experimental/fractorial_cpu.py similarity index 60% rename from clusterops/fractorial_cpu.py rename to experimental/fractorial_cpu.py index 5e8886b..6f8936f 100644 --- a/clusterops/fractorial_cpu.py +++ b/experimental/fractorial_cpu.py @@ -6,52 +6,70 @@ from ctypes import c_double from loguru import logger + def set_cpu_affinity(cpu_id): try: if platform.system() == "Windows": import win32process - win32process.SetProcessAffinityMask(os.getpid(), 1 << cpu_id) + + win32process.SetProcessAffinityMask( + os.getpid(), 1 << cpu_id + ) elif platform.system() == "Linux": os.sched_setaffinity(0, [cpu_id]) else: - logger.warning(f"CPU affinity not supported on {platform.system()}") + logger.warning( + f"CPU affinity not supported on {platform.system()}" + ) except Exception as e: logger.error(f"Failed to set CPU affinity: {e}") + def memory_intensive_task(size: int): large_list = [i for i in range(size)] return float(sum(large_list)) + def task1(): return memory_intensive_task(500000) + def task2(): return memory_intensive_task(700000) + def task3(): return memory_intensive_task(900000) + class FractionalizedCPU: def __init__(self): self.result_array = None - def worker(self, func, index, cpu_id, memory_limit, *args, **kwargs): + def worker( + self, func, index, cpu_id, memory_limit, *args, **kwargs + ): try: # Set CPU affinity set_cpu_affinity(cpu_id) - + # Set memory limit try: import resource - resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit)) + + resource.setrlimit( + resource.RLIMIT_AS, (memory_limit, memory_limit) + ) except ImportError: - logger.warning("resource module not available, memory limit not set") - + logger.warning( + "resource module not available, memory limit not set" + ) + result = func(*args, **kwargs) self.result_array[index] = result except Exception as e: logger.error(f"Error in worker process: {e}") - self.result_array[index] = float('nan') + self.result_array[index] = float("nan") def execute_on_fractionalized_cpu( self, @@ -59,7 +77,7 @@ def execute_on_fractionalized_cpu( memory_fraction: float, func: Union[Callable, List[Callable]], *args: Any, - **kwargs: Any + **kwargs: Any, ) -> Any: """ Executes a callable or list of callables on a fractionalized CPU core with limited memory. @@ -81,19 +99,29 @@ def execute_on_fractionalized_cpu( try: available_cpus = psutil.cpu_count(logical=False) if cpu_id < 0 or cpu_id >= available_cpus: - raise ValueError(f"Invalid CPU core: {cpu_id}. Available CPUs are 0 to {available_cpus - 1}.") - + raise ValueError( + f"Invalid CPU core: {cpu_id}. Available CPUs are 0 to {available_cpus - 1}." + ) + if memory_fraction <= 0.0 or memory_fraction > 1.0: - raise ValueError("Memory fraction must be between 0.0 and 1.0.") + raise ValueError( + "Memory fraction must be between 0.0 and 1.0." + ) total_memory = psutil.virtual_memory().total - memory_limit = int(total_memory * memory_fraction / available_cpus) + memory_limit = int( + total_memory * memory_fraction / available_cpus + ) if isinstance(func, list): self.result_array = Array(c_double, len(func)) processes = [] for i, f in enumerate(func): - p = Process(target=self.worker, args=(f, i, cpu_id, memory_limit) + args, kwargs=kwargs) + p = Process( + target=self.worker, + args=(f, i, cpu_id, memory_limit) + args, + kwargs=kwargs, + ) processes.append(p) p.start() @@ -104,24 +132,31 @@ def execute_on_fractionalized_cpu( return results else: self.result_array = Array(c_double, 1) - p = Process(target=self.worker, args=(func, 0, cpu_id, memory_limit) + args, kwargs=kwargs) + p = Process( + target=self.worker, + args=(func, 0, cpu_id, memory_limit) + args, + kwargs=kwargs, + ) p.start() p.join() return self.result_array[0] except Exception as e: - logger.error(f"Error executing on fractionalized CPU {cpu_id}: {e}") + logger.error( + f"Error executing on fractionalized CPU {cpu_id}: {e}" + ) raise -if __name__ == "__main__": - fractionalized_cpu = FractionalizedCPU() - # Execute a single function - result = fractionalized_cpu.execute_on_fractionalized_cpu(0, 0.1, memory_intensive_task, 1000000) - print(f"Single task result: {result}") +# if __name__ == "__main__": +# fractionalized_cpu = FractionalizedCPU() + +# # Execute a single function +# result = fractionalized_cpu.execute_on_fractionalized_cpu(0, 0.1, memory_intensive_task, 1000000) +# print(f"Single task result: {result}") - # Execute multiple functions - tasks = [task1, task2, task3] - results = fractionalized_cpu.execute_on_fractionalized_cpu(0, 0.3, tasks) - print(f"Multiple tasks results: {results}") \ No newline at end of file +# # Execute multiple functions +# tasks = [task1, task2, task3] +# results = fractionalized_cpu.execute_on_fractionalized_cpu(0, 0.3, tasks) +# print(f"Multiple tasks results: {results}") diff --git a/clusterops/gpu_scheduler.py b/experimental/gpu_scheduler.py similarity index 69% rename from clusterops/gpu_scheduler.py rename to experimental/gpu_scheduler.py index f488aa5..47f84b4 100644 --- a/clusterops/gpu_scheduler.py +++ b/experimental/gpu_scheduler.py @@ -11,11 +11,12 @@ ray.init(ignore_reinit_error=True) serve.start(detached=True) + # GPU Utility Function def detect_gpus() -> List[str]: """ Detects available GPU IDs on the system using torch.cuda. - + Returns: List[str]: A list of detected GPU IDs. """ @@ -23,13 +24,16 @@ def detect_gpus() -> List[str]: logger.info(f"Detected GPUs: {gpu_ids}") return gpu_ids + @serve.deployment(route_prefix="/execute_job", num_replicas=2) class GPUJobExecutor: def __init__(self): """Initialize GPUJobExecutor with a dynamic GPU pool and intelligent scheduler.""" self.gpu_pool = detect_gpus() self.available_gpus = self.gpu_pool.copy() - self.health_status = {gpu: True for gpu in self.gpu_pool} # GPU health tracking + self.health_status = { + gpu: True for gpu in self.gpu_pool + } # GPU health tracking logger.info(f"Initialized GPU pool: {self.gpu_pool}") asyncio.create_task(self.monitor_gpu_health()) @@ -50,45 +54,59 @@ def _check_gpu_health(self, gpu_id: str) -> bool: """ Checks the health of a GPU based on simple metrics. This could be extended with more sophisticated monitoring in production. - + Args: gpu_id (str): GPU ID to check. - + Returns: bool: True if healthy, False if not. """ - gpu_index = int(gpu_id.split('-')[-1]) + gpu_index = int(gpu_id.split("-")[-1]) memory_used = torch.cuda.memory_allocated(gpu_index) - memory_limit = torch.cuda.get_device_properties(gpu_index).total_memory + memory_limit = torch.cuda.get_device_properties( + gpu_index + ).total_memory # Basic health check: GPU is considered unhealthy if memory usage exceeds 90% return memory_used < 0.9 * memory_limit - async def execute(self, job_fn: Callable[[], Any], priority: int, *args, **kwargs) -> Any: + async def execute( + self, + job_fn: Callable[[], Any], + priority: int, + *args, + **kwargs, + ) -> Any: """ Executes a job function with an available GPU in the pool. - + Args: job_fn (Callable): The job function to execute. priority (int): Priority level for the job (1 is highest priority). *args: Positional arguments for the job function. **kwargs: Keyword arguments for the job function. - + Returns: Any: The result of the job function execution. """ if not self.available_gpus: - logger.warning("No GPUs available. Job will wait for an available GPU.") + logger.warning( + "No GPUs available. Job will wait for an available GPU." + ) await self._wait_for_available_gpu() gpu_id = self._allocate_gpu(priority) - logger.info(f"Allocating {gpu_id} for job execution with priority {priority}.") + logger.info( + f"Allocating {gpu_id} for job execution with priority {priority}." + ) try: result = await job_fn(*args, **kwargs) logger.info(f"Job completed successfully on {gpu_id}.") return result except Exception as e: - logger.error(f"Error during job execution on {gpu_id}: {e}") + logger.error( + f"Error during job execution on {gpu_id}: {e}" + ) raise finally: self._release_gpu(gpu_id) @@ -104,18 +122,26 @@ async def _wait_for_available_gpu(self): def _allocate_gpu(self, priority: int) -> str: """ Allocates the first healthy GPU from the pool with priority handling. - + Returns: str: The allocated GPU ID. """ - healthy_gpus = [gpu for gpu in self.available_gpus if self.health_status[gpu]] + healthy_gpus = [ + gpu + for gpu in self.available_gpus + if self.health_status[gpu] + ] if healthy_gpus: gpu_id = healthy_gpus.pop(0) self.available_gpus.remove(gpu_id) - logger.info(f"{gpu_id} allocated with priority {priority}.") + logger.info( + f"{gpu_id} allocated with priority {priority}." + ) return gpu_id else: - logger.warning("No healthy GPUs available; waiting for recovery.") + logger.warning( + "No healthy GPUs available; waiting for recovery." + ) time.sleep(1) return self._allocate_gpu(priority) @@ -129,7 +155,13 @@ def _release_gpu(self, gpu_id: str): self.available_gpus.append(gpu_id) logger.info(f"{gpu_id} released back to pool.") - async def __call__(self, job_fn: Callable[[], Any], priority: int = 1, *args, **kwargs) -> Any: + async def __call__( + self, + job_fn: Callable[[], Any], + priority: int = 1, + *args, + **kwargs, + ) -> Any: return await self.execute(job_fn, priority, *args, **kwargs) @@ -138,27 +170,35 @@ class JobScheduler: def __init__(self, executor_handle: Any): """ JobScheduler to manage job execution with fault tolerance, job retries, and scaling. - + Args: executor_handle (RayServeHandle): Handle to GPUJobExecutor deployment. """ self.executor_handle = executor_handle self.max_retries = 3 - self.job_queue: List[Tuple[int, Callable]] = [] # Prioritized job queue + self.job_queue: List[Tuple[int, Callable]] = ( + [] + ) # Prioritized job queue self.min_executors = 2 # Minimum number of executors running self.max_executors = 6 # Maximum number of executors allowed asyncio.create_task(self.scale_cluster()) - async def schedule_job(self, job_fn: Callable[[], Any], priority: int = 1, *args, **kwargs) -> Any: + async def schedule_job( + self, + job_fn: Callable[[], Any], + priority: int = 1, + *args, + **kwargs, + ) -> Any: """ Schedule and execute a job with retries in case of failure. - + Args: job_fn (Callable): The job function to execute. priority (int): Priority level for the job (1 is highest priority). *args: Positional arguments for the job function. **kwargs: Keyword arguments for the job function. - + Returns: Any: The result of the job function execution. """ @@ -168,16 +208,26 @@ async def schedule_job(self, job_fn: Callable[[], Any], priority: int = 1, *args for attempt in range(self.max_retries): try: _, next_job = self.job_queue.pop(0) - result = await self.executor_handle.execute.remote(next_job, priority, *args, **kwargs) - logger.info(f"Job completed successfully on attempt {attempt + 1}") + result = await self.executor_handle.execute.remote( + next_job, priority, *args, **kwargs + ) + logger.info( + f"Job completed successfully on attempt {attempt + 1}" + ) return result except Exception as e: - logger.warning(f"Job failed on attempt {attempt + 1}: {e}") + logger.warning( + f"Job failed on attempt {attempt + 1}: {e}" + ) if attempt == self.max_retries - 1: - logger.error("Max retries reached. Job failed permanently.") + logger.error( + "Max retries reached. Job failed permanently." + ) raise - await asyncio.sleep(2 ** attempt) # Exponential backoff - logger.info(f"Retrying job execution, attempt {attempt + 2}") + await asyncio.sleep(2**attempt) # Exponential backoff + logger.info( + f"Retrying job execution, attempt {attempt + 2}" + ) async def scale_cluster(self): """ @@ -185,16 +235,28 @@ async def scale_cluster(self): """ while True: queue_size = len(self.job_queue) - num_executors = len(await self.executor_handle.list_replicas.remote()) - + num_executors = len( + await self.executor_handle.list_replicas.remote() + ) + if queue_size > 5 and num_executors < self.max_executors: logger.info("Scaling up resources for high load.") - new_executor = GPUJobExecutor.bind() # Create a new executor - ray.get(new_executor.deploy()) # Deploy additional executor - elif queue_size == 0 and num_executors > self.min_executors: + new_executor = ( + GPUJobExecutor.bind() + ) # Create a new executor + ray.get( + new_executor.deploy() + ) # Deploy additional executor + elif ( + queue_size == 0 and num_executors > self.min_executors + ): logger.info("Scaling down resources for low load.") - await self.executor_handle.remove_replicas.remote(1) # Remove an executor - await asyncio.sleep(10) # Adjust scaling check frequency as needed + await self.executor_handle.remove_replicas.remote( + 1 + ) # Remove an executor + await asyncio.sleep( + 10 + ) # Adjust scaling check frequency as needed # # Deployment @@ -216,19 +278,21 @@ async def scale_cluster(self): # serve.run(run) -def gpu_scheduler(function: Callable[[], Any], priority: int = 1, *args, **kwargs) -> Any: +def gpu_scheduler( + function: Callable[[], Any], priority: int = 1, *args, **kwargs +) -> Any: """ Schedule and execute a function on available GPUs using Ray and Ray Serve. - + Args: function (Callable[[], Any]): The function to execute on GPU priority (int, optional): Priority level for job scheduling. Lower numbers indicate higher priority. Defaults to 1. *args: Additional positional arguments to pass to the function **kwargs: Additional keyword arguments to pass to the function - + Returns: Any: Result of the executed function - + Raises: RuntimeError: If there is an error initializing the GPU executor or job scheduler TimeoutError: If the job execution times out @@ -238,18 +302,26 @@ def gpu_scheduler(function: Callable[[], Any], priority: int = 1, *args, **kwarg # Initialize GPU executor and job scheduler gpu_executor = GPUJobExecutor.bind() job_scheduler = JobScheduler.bind(gpu_executor) - + # Schedule and execute the job - job_handle = job_scheduler.schedule_job.remote(function, priority, *args, **kwargs) + job_handle = job_scheduler.schedule_job.remote( + function, priority, *args, **kwargs + ) result = ray.get(job_handle, timeout=3600) # 1 hour timeout - - logger.info(f"Job completed successfully with result: {result}") + + logger.info( + f"Job completed successfully with result: {result}" + ) return result - + except TimeoutError as e: logger.error(f"Job execution timed out: {e}") - raise TimeoutError("GPU job execution timed out after 1 hour") from e - + raise TimeoutError( + "GPU job execution timed out after 1 hour" + ) from e + except Exception as e: logger.error(f"Error executing GPU job: {e}") - raise RuntimeError(f"Failed to execute GPU job: {str(e)}") from e + raise RuntimeError( + f"Failed to execute GPU job: {str(e)}" + ) from e diff --git a/pyproject.toml b/pyproject.toml index a1d4081..dd8dd81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "clusterops" -version = "0.0.5" +version = "0.1.4" description = "Paper - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -31,6 +31,7 @@ psutil = "*" + [tool.poetry.group.lint.dependencies] ruff = "^0.1.6" types-toml = "^0.10.8.1"