diff --git a/.agents/skills/pyflowx-testing/SKILL.md b/.trae/skills/pyflowx-testing/SKILL.md similarity index 100% rename from .agents/skills/pyflowx-testing/SKILL.md rename to .trae/skills/pyflowx-testing/SKILL.md diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index 4a67468..1796d9d 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -41,7 +41,9 @@ from __future__ import annotations import asyncio +import atexit import concurrent.futures +import contextlib import inspect import logging import threading @@ -60,6 +62,9 @@ logger = logging.getLogger(__name__) # 进程池复用:同一次 run() 内的 process 任务共享一个 ProcessPoolExecutor。 # 模块级缓存避免每次任务都创建/销毁进程池的开销。 +# run() 结束后通过 _shutdown_process_pool() 关闭(shutdown(wait=False) + +# kill 工作进程),避免 Python 退出时 threading._shutdown 等待管理线程 +# join 工作进程导致数秒阻塞。 _process_pool: concurrent.futures.ProcessPoolExecutor | None = None _process_pool_lock = threading.Lock() @@ -74,6 +79,31 @@ def _get_process_pool() -> concurrent.futures.ProcessPoolExecutor: return _process_pool +def _shutdown_process_pool() -> None: + """关闭复用的进程池。 + + ``shutdown(wait=False)`` 通知管理线程退出(管理线程是非 daemon, + ``threading._shutdown`` 会等待它);同时 kill 工作进程,避免管理线程 + 在退出前逐个 join 工作进程导致数秒阻塞。 + """ + global _process_pool # noqa: PLW0603 + if _process_pool is not None: + pool = _process_pool + _process_pool = None + # 在 shutdown 前获取进程列表(管理线程退出会清空 _processes)。 + # _processes 是 ProcessPoolExecutor 的私有属性,无公开 API 替代。 + procs = list((getattr(pool, "_processes", None) or {}).values()) + pool.shutdown(wait=False) + # 强制终止工作进程(SIGKILL),避免管理线程 join 导致 ~7s 阻塞。 + for proc in procs: + with contextlib.suppress(ProcessLookupError, AttributeError): + proc.kill() # type: ignore[attr-defined] + + +# 兜底:防止未经 run() 直接使用执行器的场景导致进程池泄漏。 +atexit.register(_shutdown_process_pool) + + def _run_in_process(fn: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: """模块级函数:在进程池中执行任务(须可 pickle)。 @@ -793,6 +823,10 @@ def run( except TaskFailedError: report.success = False raise + finally: + # 关闭进程池:通知管理线程退出 + kill 工作进程,避免 + # threading._shutdown 等待管理线程 join 工作进程导致 ~7s 阻塞。 + _shutdown_process_pool() return report