From 8fadf6edd884767f101001ebdc6d3dcf07fd9917 Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sun, 28 Jun 2026 18:56:27 +0800 Subject: [PATCH] =?UTF-8?q?fix(executors):=20=E4=BF=AE=E5=A4=8D=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E9=80=80=E5=87=BA=E9=98=BB=E5=A1=9E=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新增_shutdown_process_pool函数,在run()结束时主动关闭进程池 2. 通过atexit注册兜底清理逻辑,防止进程池泄漏 3. 先调用shutdown(wait=False)通知管理线程退出,再强制kill工作进程,避免Python退出时threading._shutdown等待join导致数秒阻塞 4. 新增测试规范文档说明测试相关规则 --- .../skills/pyflowx-testing/SKILL.md | 0 src/pyflowx/executors.py | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+) rename {.agents => .trae}/skills/pyflowx-testing/SKILL.md (100%) diff --git a/.agents/skills/pyflowx-testing/SKILL.md b/.trae/skills/pyflowx-testing/SKILL.md similarity index 100% rename from .agents/skills/pyflowx-testing/SKILL.md rename to .trae/skills/pyflowx-testing/SKILL.md diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index 4a67468..1796d9d 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -41,7 +41,9 @@ from __future__ import annotations import asyncio +import atexit import concurrent.futures +import contextlib import inspect import logging import threading @@ -60,6 +62,9 @@ logger = logging.getLogger(__name__) # 进程池复用:同一次 run() 内的 process 任务共享一个 ProcessPoolExecutor。 # 模块级缓存避免每次任务都创建/销毁进程池的开销。 +# run() 结束后通过 _shutdown_process_pool() 关闭(shutdown(wait=False) + +# kill 工作进程),避免 Python 退出时 threading._shutdown 等待管理线程 +# join 工作进程导致数秒阻塞。 _process_pool: concurrent.futures.ProcessPoolExecutor | None = None _process_pool_lock = threading.Lock() @@ -74,6 +79,31 @@ def _get_process_pool() -> concurrent.futures.ProcessPoolExecutor: return _process_pool +def _shutdown_process_pool() -> None: + """关闭复用的进程池。 + + ``shutdown(wait=False)`` 通知管理线程退出(管理线程是非 daemon, + ``threading._shutdown`` 会等待它);同时 kill 工作进程,避免管理线程 + 在退出前逐个 join 工作进程导致数秒阻塞。 + """ + global _process_pool # noqa: PLW0603 + if _process_pool is not None: + pool = _process_pool + _process_pool = None + # 在 shutdown 前获取进程列表(管理线程退出会清空 _processes)。 + # _processes 是 ProcessPoolExecutor 的私有属性,无公开 API 替代。 + procs = list((getattr(pool, "_processes", None) or {}).values()) + pool.shutdown(wait=False) + # 强制终止工作进程(SIGKILL),避免管理线程 join 导致 ~7s 阻塞。 + for proc in procs: + with contextlib.suppress(ProcessLookupError, AttributeError): + proc.kill() # type: ignore[attr-defined] + + +# 兜底:防止未经 run() 直接使用执行器的场景导致进程池泄漏。 +atexit.register(_shutdown_process_pool) + + def _run_in_process(fn: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: """模块级函数:在进程池中执行任务(须可 pickle)。 @@ -793,6 +823,10 @@ def run( except TaskFailedError: report.success = False raise + finally: + # 关闭进程池:通知管理线程退出 + kill 工作进程,避免 + # threading._shutdown 等待管理线程 join 工作进程导致 ~7s 阻塞。 + _shutdown_process_pool() return report