Source code for orc_bound.utils.parallel
"""
Parallel utilities for ORC-Bound.
"""
from __future__ import annotations
import os
from concurrent.futures import ThreadPoolExecutor
__all__ = ["ThreadPool", "_cpu_count"]
[docs]
def _cpu_count(n_jobs: int | None) -> int:
"""
Resolve the number of workers for parallel execution.
Parameters
----------
n_jobs : int or None
- ``None`` or ``0``: use all CPUs.
- Positive int: use exactly that many workers.
- ``-1``: use all CPUs minus one.
Returns
-------
int
Number of workers to use (always at least 1).
"""
physical_cores = os.cpu_count() or 1
if n_jobs is None or n_jobs == 0:
return max(1, physical_cores)
if n_jobs == -1:
return max(1, physical_cores - 1)
return max(1, n_jobs)
class ThreadPool:
"""
Thin wrapper around ThreadPoolExecutor for use within orc_bound.
Parameters
----------
n_workers : int or None
Number of worker threads. Passed through to :func:`_cpu_count`.
func : callable
The function to apply to each task.
iterable : iterable
Iterable of tasks. Each task is unpacked as ``func(*task)``.
"""
def __init__(
self,
n_workers: int | None = None,
):
self.n_workers = _cpu_count(n_workers)
def map(self, func, iterable):
"""
Apply ``func`` to each item in ``iterable`` in parallel.
Yields results in completion order (not submission order).
Parameters
----------
func : callable
Function to execute. Should be a top-level or module-level
function for pickling compatibility.
iterable : iterable
Each element is passed as a single argument to ``func``.
Yields
------
Results from ``func`` calls as they complete.
"""
with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
futures = [executor.submit(func, item) for item in iterable]
for f in as_completed(futures):
yield f.result()
def __enter__(self):
return self
def __exit__(self, *args):
pass
# Re-export as_completed for convenience
from concurrent.futures import as_completed
__all__ += ["as_completed"]